1

我正在使用 Spark 1.1。我有一个 Spark 作业,它仅在存储桶下寻找特定模式的文件夹(即以...开头的文件夹),并且应该只处理那些。我通过执行以下操作来实现这一点:

FileSystem fs = FileSystem.get(new Configuration(true));
FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
List<FileStatus> statusList = Arrays.asList(statusArr);

List<String> pathsStr = convertFileStatusToPath(statusList);

JavaRDD<String> paths = sc.parallelize(pathsStr);

但是,在 Google 云存储路径上运行此作业时:gs://rsync-1/2014_07_31*(使用最新的谷歌云存储连接器 1.2.9),我收到以下错误:

4/10/13 10:28:38 INFO slf4j.Slf4jLogger: Slf4jLogger started    
14/10/13 10:28:38 INFO util.Utils: Successfully started service 'Driver' on port 60379.    
14/10/13 10:28:38 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@hadoop-w-9.c.taboola-qa-01.internal:45212/user/Worker    
Exception in thread "main" java.lang.reflect.InvocationTargetException    
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)    
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    
    at java.lang.reflect.Method.invoke(Method.java:606)    
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)    
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)    
Caused by: java.lang.IllegalArgumentException: Wrong bucket: rsync-1, in path: gs://rsync-1/2014_07_31*, expected bucket: hadoop-config    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:100)    
    at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:294)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:457)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:163)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1052)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1027)    
    at com.doit.customer.dataconverter.Phase0.main(Phase0.java:578)    
... 6 more

当我在本地文件夹上运行此作业时,一切正常。

hadoop-config 是我用于在 Google Compute Engine 上部署 Spark 集群的存储桶(使用 bdutil 0.35.2 工具)

4

1 回答 1

6

简答

而不是使用:

    FileSystem fs = FileSystem.get(new Configuration(true));
    FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
    List<FileStatus> statusList = Arrays.asList(statusArr);

你需要做

    Path inputPathObj = new Path(inputPath);
    FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true));
    FileStatus[] statusArr = fs.globStatus(inputPathObj);
    List<FileStatus> statusList = Arrays.asList(statusArr);

因为在 Hadoop 中,FileSystem 实例是基于 URI 的schemeandauthority组件(以及更高级设置中的潜在用户组信息)共享的,并且此类实例在方案和权限之间不可互换。

长答案

这与[scheme]://[authority]/[path]中的 ahostnamepath组件之间的区别有关,这在 HDFS 用例中可能更明显,但也适用于 GCS。基本上,org.apache.hadoop.fs.FileSystem里面有几个方法,这里最适用的有:URIget

public static FileSystem get(Configuration conf)

public static FileSystem get(URI uri, Configuration conf)

前者实际上只是将后者称为:

    return get(getDefaultUri(conf), conf);

其中getDefaultUri(conf)fs.default.nameor定义fs.defaultFS。第二个考虑是具有不同hosthnameauthority组件的文件系统被认为是本质上不同的文件系统;在 HDFS 的情况下,这是有道理的,因为:

    FileSystem.get("hdfs://foo-cluster-namenode/", conf);
    FileSystem.get("hdfs://bar-cluster-namenode/", conf);

每个点都可能在不同的集群上完全不同的文件系统实例,允许在两个不同的 HDFS 实例上使用相同的路径名来引用不同的存储命名空间。尽管在机器的“主机名”方面不太透明,但bucketGCS 中的 GCS 确实充当authority了 GCE URI 的组件——在 Hadoop 中,这意味着当它们相同但实例不同时,FileSystem.get实际上返回相同的缓存 Java FileSystem 对象bucket对于不同的桶。就像您不能创建 HDFS 实例并将其指向不同的权限一样:

    // Can't mix authorities!
    FileSystem.get("hdfs://foo/", conf).listStatus(new Path("hdfs://bar/"));

当你打电话给你时,FileSystem.get(conf)你有效地得到了一个指向的缓存实例gs://hadoop-config/,然后用它来尝试列出gs://rsync-1

相反,当您知道要操作的路径时,应该是您获取 FileSystem 实例的时间:

    FileSystem fs = FileSystem.get(myPath.toUri(), new Configuration(true));
    fs.globStatus(myPath);
于 2014-10-20T00:07:14.353 回答