我正在使用 Spark 1.3 对大量数据进行聚合。这项工作包括 4 个步骤:
- 读取一个大(1TB)的序列文件(对应1天的数据)
- 过滤掉大部分并获得大约 1GB 的随机写入
- keyBy客户
- aggregateByKey() 到为该客户构建配置文件的自定义结构,对应于每个客户的 HashMap[Long, Float]。长键是唯一的,并且从不大于 50K 不同的条目。
我正在使用以下配置运行它:
--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \
并得到这个错误:
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
...
Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
... 21 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
该作业和逻辑已被证明可以与一个小型测试集一起使用,我什至可以在某些日期运行该作业,但不能在其他日期运行。我搜索了一下,发现提示“与 MapOutputTracker 通信时出错”与内部 Spark 消息有关,但我已经增加了“spark.akka.frameSize”、“spark.akka.timeout”和“spark.akka.askTimeout”(最后一个甚至没有出现在 Spark 文档中,但在 Spark 邮件列表中被提及),但无济于事。在 30 秒时仍有一些超时,我不知道如何识别或修复。
我认为没有理由因为数据大小而失败,因为过滤操作和 aggregateByKey 执行本地部分聚合的事实应该足以解决数据大小问题。任务的数量是 16K(从原始输入自动),比在 100 个执行器上运行的 800 个内核要多得多,所以它不像通常的“增量分区”技巧那么简单。任何线索将不胜感激!谢谢!