2

我在 Spark 上以独立模式运行作业,版本 1.2.0

我正在做的第一个操作是获取文件夹路径的 RDD,并生成文件名的 RDD,由驻留在每个文件夹中的文件组成:

JavaRDD<String> filePaths = paths.mapPartitions(new FoldersToFiles()).repartition(defaultPartitions);

FoldersToFiles 类的内部实现是:

@Override
public Iterable<String> call(Iterator<String> pathsIterator) throws Exception {
    List<String> filesPath = new ArrayList<String>();
    if (pathsIterator != null) {
        while (pathsIterator.hasNext()) {
            try {
                String currFolder = pathsIterator.next();
                Path currPath = new Path(currFolder);
                FileSystem fs = FileSystem.get(currPath.toUri(), new Configuration(true));
                FileStatus[] files = fs.listStatus(currPath);
                List<FileStatus> filesList = Arrays.asList(files);
                List<String> filesPathsStr = new Utils().convertFileStatusToPath(filesList);
                filesPath.addAll(filesPathsStr);
            } catch(Exception e) {
                log.error("Error during file names extraction: " + e.getMessage());
            }
        }
    }
    if(filesPath == null || filesPath.isEmpty()) {
        log.error("Warning: files path list is null or empty!! Given Path Iterator is: " + pathsIterator.toString());
    }
    return filesPath;
}

在集群上运行作业时,出现以下错误:

520983 [task-result-getter-1] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 33.0 in stage 1.0 (TID 1033, hadoop-w-8.c.taboola-qa-01.internal): java.lang.NullPointerException
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:140)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

所以错误并不直接在我的代码中。但是,查看 Spark 代码中的相关行:

  /**
   * Return a new RDD by applying a function to each partition of this RDD.
   */
  def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
    JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
  }

(发生异常的第 140 行是第一个)

它可能与我上面提到的代码有关(这实际上是我工作中的第一个 mapPartitions,所以这很有意义),但是我不明白为什么。

4

1 回答 1

0

只是预感:也许 FoldersToFiles 类需要声明为静态(如果它是私有类)?

于 2015-01-15T12:55:23.137 回答