我正在尝试迭代 JavaPairRDD 并对值应用一些转换(它是 Java 模型类,键是字符串)并返回与 JavaPairRDD 相同的键值对。
在抛出 outofMemoryError 之前它说Marking Stage 5 (saveAsTextFile at AppDaoImpl.java:219) as failed due to a fetch failure from Stage 1 (mapToPair at AppDataUtil.java:221)
有没有我们可以优化下面的代码,在我看来很简单的代码。但是当我处理大文件时,我正面临这个内存不足错误。
我也传递了以下参数。
--num-executors 20 --executor-memory 12288M --executor-cores 5 --driver-memory 6G --conf spark.yarn.executor.memoryOverhead=1332
示例代码是:
return parquetFileContent.mapToPair(tuple -> {
SimpleGroup simpleGroup = tuple._2();
Model inputData = applyTransformationLogic(simpleGroup);
return new Tuple2<String, Model>(inputData.getSomeStringField(), inputData);
});
在调用之前saveAsTextFile()
,我添加了三个使用union
和调用此方法的 RDD。
javaSparkCtx.union(rdd1, rdd2, rdd3).saveAsTextFile("hdfs filepath");
我想将所有 rdd 写到同一个位置,所以我正在使用union
是否可以在同一个位置分别调用每个 rdd?
日志跟踪是:
15/12/18 15:47:39 INFO scheduler.DAGScheduler: Marking Stage 5 (saveAsTextFile at AppDaoImpl.java:219) as failed due to a fetch failure from Stage 1 (mapToPair at AppDataUtil.java:221)
15/12/18 15:47:39 INFO scheduler.DAGScheduler: Stage 5 (saveAsTextFile at AppDaoImpl.java:219) failed in 78.951 s
15/12/18 15:47:39 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (mapToPair at AppDataUtil.java:221) and Stage 5 (saveAsTextFile at AppDaoImpl.java:219) due to fetch failure
15/12/18 15:47:39 INFO scheduler.DAGScheduler: Executor lost: 2 (epoch 5)
15/12/18 15:47:39 INFO storage.BlockManagerMasterActor: Trying to remove executor 2 from BlockManagerMaster.
15/12/18 15:47:39 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(2, lpdn0185.com, 37626)
15/12/18 15:47:39 INFO storage.BlockManagerMaster: Removed 2 successfully in removeExecutor
15/12/18 15:47:39 INFO scheduler.Stage: Stage 1 is now unavailable on executor 2 (26/56, false)
15/12/18 15:47:39 INFO scheduler.Stage: Stage 2 is now unavailable on executor 2 (25/56, false)
15/12/18 15:47:39 WARN scheduler.TaskSetManager: Lost task 2.1 in stage 5.0 (TID 119, lpdn0185.com): FetchFailed(BlockManagerId(2, lpdn0185.com, 37626), shuffleId=4, mapId=0, reduceId=2, message=
org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/hdfs1/yarn/nm/usercache/phdpentcustcdibtch/appcache/application_1449986083135_60217/blockmgr-34a2e882-6b36-42c6-bcff-03d9bc5ef80b/2c/shuffle_4_0_0.data, offset=3038022, length=2959077}
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)