5

我一直在试图弄清楚当我遍历镶木地板文件和几个后处理功能时,如何防止 Spark 由于内存问题而崩溃。对大量文本感到抱歉,但这并不完全是一个特定的错误(我正在使用 PySpark。)如果这破坏了正确的 Stack Overflow 形式,我们深表歉意!

基本伪代码为:

#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its  "=" subdirectory
for counter in fileNums:
  sparkDataFrame = sqlContext.read.parquet(counter)
  summaryReportOne = sqlContext.sql.("SELECT.....")
  summaryReportOne.write.partition("id").parquet("/")
  summaryReportTwo = sqlContext.sql.("SELECT....")
  summaryReportTwo.write.partition("id").parquet("/")
  #several more queries, several involving joins, etc....

此代码使用 spark SQL 查询,因此我未能成功创建包含所有 SQL 查询/函数的包装函数并将其传递给 foreach(它不能将 sparkContext 或 sqlQuery 作为输入)而不是标准环形。

从技术上讲,这是一个有分区的大型 parquet 文件,但一次读取并查询它的规模太大了;我需要在每个分区上运行这些功能。所以我只是在 PySpark 中运行一个常规的 python 循环,在每个循环中,我处理一个 parquet 分区(子目录)并编写相关的输出报告。

由于整个镶木地板文件的大小,不确定是否将所有代码包装在一个大 mapPartition() 周围是否可行?

但是在几个循环之后,脚本由于内存错误而崩溃 - 特别是 Java 堆错误。(我已经确认循环崩溃的文件没有什么特别之处;它发生在第二个或第三个循环中读取的任何随机文件中。)

Caused by: com.google.protobuf.ServiceException:     
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at    org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space

我意识到 Spark 并不意味着要在循环中运行,但是这些 SQL 查询对于标准的 Spark SQL 打包函数来说有点太复杂了,我们为每个文件编写了多个关于不同聚合统计信息的汇总报告。

有没有办法在每个循环索引结束时基本上清除内存?使用 sqlContext.dropTempTable() 删除任何已注册的临时表并使用 sqlContext.clearCache() 清除缓存都没有帮助。如果我尝试停止 sparkContext 并在每个循环中重新启动它,我也会收到错误,因为某些进程尚未“结束”(看起来您曾经能够“优雅地”停止上下文,但我在当前的 PySpark 文档中找不到这个。)

我还应该注意,在完成处理后,我没有在循环中的数据帧上调用 unpersist(),但我也没有在它们上调用 persist();我只是在每个循环中重写数据帧(这可能是问题的一部分)。

我正在与我们的工程团队一起调整内存设置,但我们知道我们已经分配了足够多的内存来完成这个脚本的一个循环(并且一个循环确实运行没有任何错误)。

任何建议都会有所帮助 - 包括可能比 Spark 更适合此用例的工具。我使用的是 Spark 1.6.1 版。

4

2 回答 2

1

如果可以,请尝试升级到新发布的 spark 2.0。

我在 Java 堆空间中遇到了与您非常相似的问题。通过简单地重复创建数据帧的过程并使用 spark 1.6.2 一遍又一遍地调用,我能够超过 4G 的堆空间。

在使用 SparkSession 的 spark 2.0 中,同一个程序仅获得 1.2 GB 的堆空间,并且内存使用情况与我正在运行的那个程序的预期非常一致。

于 2016-08-08T15:11:40.440 回答
1

更新:如果我在每个循环中完成后对我从 sql 查询创建的每个表调用 unpersist(),则循环可以成功地继续到下一次迭代,而不会出现内存问题。如上所述,.clearCache() 和单独删除临时表并没有解决问题。我猜这很有效,因为虽然这些表来自 sparkSQL 查询,但它返回一个 RDD。

尽管我没有对这些 RDD 调用persist(),但我必须告诉 Spark 在下一个循环开始之前清除它们,以便我可以将新的 SQL 查询分配给这些相同的变量名。

于 2016-05-20T18:49:06.797 回答