我一直在试图弄清楚当我遍历镶木地板文件和几个后处理功能时,如何防止 Spark 由于内存问题而崩溃。对大量文本感到抱歉,但这并不完全是一个特定的错误(我正在使用 PySpark。)如果这破坏了正确的 Stack Overflow 形式,我们深表歉意!
基本伪代码为:
#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its "=" subdirectory
for counter in fileNums:
sparkDataFrame = sqlContext.read.parquet(counter)
summaryReportOne = sqlContext.sql.("SELECT.....")
summaryReportOne.write.partition("id").parquet("/")
summaryReportTwo = sqlContext.sql.("SELECT....")
summaryReportTwo.write.partition("id").parquet("/")
#several more queries, several involving joins, etc....
此代码使用 spark SQL 查询,因此我未能成功创建包含所有 SQL 查询/函数的包装函数并将其传递给 foreach(它不能将 sparkContext 或 sqlQuery 作为输入)而不是标准环形。
从技术上讲,这是一个有分区的大型 parquet 文件,但一次读取并查询它的规模太大了;我需要在每个分区上运行这些功能。所以我只是在 PySpark 中运行一个常规的 python 循环,在每个循环中,我处理一个 parquet 分区(子目录)并编写相关的输出报告。
由于整个镶木地板文件的大小,不确定是否将所有代码包装在一个大 mapPartition() 周围是否可行?
但是在几个循环之后,脚本由于内存错误而崩溃 - 特别是 Java 堆错误。(我已经确认循环崩溃的文件没有什么特别之处;它发生在第二个或第三个循环中读取的任何随机文件中。)
Caused by: com.google.protobuf.ServiceException:
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space
我意识到 Spark 并不意味着要在循环中运行,但是这些 SQL 查询对于标准的 Spark SQL 打包函数来说有点太复杂了,我们为每个文件编写了多个关于不同聚合统计信息的汇总报告。
有没有办法在每个循环索引结束时基本上清除内存?使用 sqlContext.dropTempTable() 删除任何已注册的临时表并使用 sqlContext.clearCache() 清除缓存都没有帮助。如果我尝试停止 sparkContext 并在每个循环中重新启动它,我也会收到错误,因为某些进程尚未“结束”(看起来您曾经能够“优雅地”停止上下文,但我在当前的 PySpark 文档中找不到这个。)
我还应该注意,在完成处理后,我没有在循环中的数据帧上调用 unpersist(),但我也没有在它们上调用 persist();我只是在每个循环中重写数据帧(这可能是问题的一部分)。
我正在与我们的工程团队一起调整内存设置,但我们知道我们已经分配了足够多的内存来完成这个脚本的一个循环(并且一个循环确实运行没有任何错误)。
任何建议都会有所帮助 - 包括可能比 Spark 更适合此用例的工具。我使用的是 Spark 1.6.1 版。