6

我正在使用 spark 处理数据,它适用于一天的数据(40G),但在一周的数据上出现OOM失败:

import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
          .map(lambda row:(row.id, row.foo))
          for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
  .reduceByKey(operator.add).saveAsTextFile("myoutput")

不同ID的数量少于10k。每个 ID 都很小int。作业失败是因为太多的执行者因 OOM 而失败。当工作成功时(在小输入上),"myoutput"大约是 100k。

  1. 我究竟做错了什么?
  2. 我尝试替换saveAsTextFilecollect(因为我实际上想在保存之前在 python 中进行一些切片和切块),行为没有区别,同样的失败。这是可以预料的吗?
  3. 我曾经有reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])而不是sc.union-哪个更好?它有什么不同吗?

该集群有25个节点,其中825GB RAM 和224个内核。

调用是spark-submit --master yarn --num-executors 50 --executor-memory 5G.

单个 RDD 有大约 140 列并涵盖一小时的数据,因此一周是 168(=7*24) 个 RDD 的并集。

4

2 回答 2

2

原来问题不在于火花,而在于纱线。解决方案是运行火花

spark-submit --conf spark.yarn.executor.memoryOverhead=1000

(或修改纱线配置)。

于 2015-03-27T14:05:57.110 回答
2

Spark 在扩展时经常遇到 Out-Of-Memory 错误。在这些情况下,程序员应该进行微调。或者重新检查你的代码,确保你没有做太多的事情,比如收集驱动程序中的所有大数据,这很可能超过限制,无论你设置多大

要了解正在发生的事情,您应该意识到当决定因超出内存限制而终止容器时。当容器超出memoryOverhead限制时,就会发生这种情况。

在调度程序中,您可以检查事件时间线以查看容器发生了什么。如果 Yarn 杀死了一个容器,它将显示为红色,当您将鼠标悬停/单击它时,您将看到如下消息:

因超出内存限制而被 YARN 杀死的容器。使用了 16.9 GB 的 16 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。

在此处输入图像描述


因此,在这种情况下,您要关注的是这些配置属性(值是集群上的示例):

# More executor memory overhead
spark.yarn.executor.memoryOverhead          4096

# More driver memory overhead
spark.yarn.driver.memoryOverhead            8192

# Max on my nodes
#spark.executor.cores                        8
#spark.executor.memory                       12G

# For the executors
spark.executor.cores                        6
spark.executor.memory                       8G

# For the driver
spark.driver.cores                          6
spark.driver.memory                         8G

首先要做的是增加memoryOverhead.

在驱动程序或执行程序中?

当您从 UI 中查看集群时,您可以单击尝试 ID 并检查诊断信息,其中应提及被杀死的容器的 ID。如果它与您的AM Container相同,则它是驱动程序,否则是执行程序。


这并没有解决问题,现在怎么办?

您必须微调您提供的核心数量和堆内存。您会看到将在堆外内存中完成大部分工作,因此您不希望为堆提供太多空间,因为那样会被浪费。你不想给的太少,因为垃圾收集器会出现问题。回想一下,这些是 JVM。

如此处所述,一个工作线程可以托管多个执行程序,因此使用的核心数量会影响每个执行程序的内存量,因此减少#cores 可能会有所帮助。

我把它写在Spark和 Spark中的memoryOverhead 问题中——容器以非零退出代码 143 退出更详细,主要是我不会忘记的!我没有尝试过的另一种选择是spark.default.parallelism or/and spark.storage.memoryFraction,根据我的经验,它没有帮助。


您可以像提到的 sds 那样传递配置标志,或者像这样:

spark-submit --properties-file my_properties

其中“my_properties”类似于我上面列出的属性。

对于非数值,您可以这样做:

spark-submit --conf spark.executor.memory='4G' 
于 2016-09-15T20:24:56.080 回答