16

我正在使用 Spark 1.3 对大量数据进行聚合。这项工作包括 4 个步骤:

  1. 读取一个大(1TB)的序列文件(对应1天的数据)
  2. 过滤掉大部分并获得大约 1GB 的随机写入
  3. keyBy客户
  4. aggregateByKey() 到为该客户构建配置文件的自定义结构,对应于每个客户的 HashMap[Long, Float]。长键是唯一的,并且从不大于 50K 不同的条目。

我正在使用以下配置运行它:

--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \

并得到这个错误:

    org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        ...
    Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
        ... 21 more
    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)

该作业和逻辑已被证明可以与一个小型测试集一起使用,我什至可以在某些日期运行该作业,但不能在其他日期运行。我搜索了一下,发现提示“与 MapOutputTracker 通信时出错”与内部 Spark 消息有关,但我已经增加了“spark.akka.frameSize”、“spark.akka.timeout”和“spark.akka.askTimeout”(最后一个甚至没有出现在 Spark 文档中,但在 Spark 邮件列表中被提及),但无济于事。在 30 秒时仍有一些超时,我不知道如何识别或修复。

我认为没有理由因为数据大小而失败,因为过滤操作和 aggregateByKey 执行本地部分聚合的事实应该足以解决数据大小问题。任务的数量是 16K(从原始输入自动),比在 100 个执行器上运行的 800 个内核要多得多,所以它不像通常的“增量分区”技巧那么简单。任何线索将不胜感激!谢谢!

4

3 回答 3

6

我有一个类似的问题,我的工作适用于较小的数据集,但会因较大的数据集而失败。

经过大量配置更改后,我发现更改驱动程序内存设置比更改执行程序内存设置影响更大。使用新的垃圾收集器也有很大帮助。我将以下配置用于 3 个集群,每个集群有 40 个核心。希望以下配置有所帮助:

spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -  
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g 
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions

spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g   
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions


spark.driver.memory=8g
spark.driver.cores=10
spark.driver.maxResultSize=8g

spark.executor.memory=16g
spark.executor.cores=25

spark.default.parallelism=50
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs
spark.eventLog.enabled=true

spark.kryoserializer.buffer=512m
spark.kryoserializer.buffer.max=1536m

spark.rdd.compress=true
spark.storage.memoryFraction=0.15
spark.storage.MemoryStore=12g
于 2016-01-20T19:04:37.400 回答
2

发生此故障时驱动程序发生了什么?这可能是由于驱动程序的内存压力导致其无响应。如果我没记错的话,它在调用 GetMapOutputStatuses 时尝试访问的 MapOutputTracker 正在 Spark 驱动程序驱动程序进程中运行。

如果您在该过程中由于某种原因面临长时间的 GC 或其他暂停,这将导致您在上面看到的异常。

当您开始看到这些错误并查看会发生什么时,可以尝试的一些事情是尝试 jstacking 驱动程序进程。如果 jstack 没有响应,则可能是您的驱动程序响应不足。

16K 的任务听起来对驱动程序来说确实很多,你有没有机会将驱动程序内存增加到 4g 以上?

于 2016-01-20T16:04:57.547 回答
0

尝试以下属性

spark.shuffle.reduceLocality.enabled = false。

请参阅此链接。 https://issues.apache.org/jira/browse/SPARK-13​​631

于 2016-11-16T14:25:59.587 回答