我在 Spark 上以独立模式运行作业,版本 1.2.0
我正在做的第一个操作是获取文件夹路径的 RDD,并生成文件名的 RDD,由驻留在每个文件夹中的文件组成:
JavaRDD<String> filePaths = paths.mapPartitions(new FoldersToFiles()).repartition(defaultPartitions);
FoldersToFiles 类的内部实现是:
@Override
public Iterable<String> call(Iterator<String> pathsIterator) throws Exception {
List<String> filesPath = new ArrayList<String>();
if (pathsIterator != null) {
while (pathsIterator.hasNext()) {
try {
String currFolder = pathsIterator.next();
Path currPath = new Path(currFolder);
FileSystem fs = FileSystem.get(currPath.toUri(), new Configuration(true));
FileStatus[] files = fs.listStatus(currPath);
List<FileStatus> filesList = Arrays.asList(files);
List<String> filesPathsStr = new Utils().convertFileStatusToPath(filesList);
filesPath.addAll(filesPathsStr);
} catch(Exception e) {
log.error("Error during file names extraction: " + e.getMessage());
}
}
}
if(filesPath == null || filesPath.isEmpty()) {
log.error("Warning: files path list is null or empty!! Given Path Iterator is: " + pathsIterator.toString());
}
return filesPath;
}
在集群上运行作业时,出现以下错误:
520983 [task-result-getter-1] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 33.0 in stage 1.0 (TID 1033, hadoop-w-8.c.taboola-qa-01.internal): java.lang.NullPointerException
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:140)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
所以错误并不直接在我的代码中。但是,查看 Spark 代码中的相关行:
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
(发生异常的第 140 行是第一个)
它可能与我上面提到的代码有关(这实际上是我工作中的第一个 mapPartitions,所以这很有意义),但是我不明白为什么。