1

我正在处理一个用例,我必须处理大量数据(多个表),并且我试图将其作为批处理作业提交给 Dataproc 集群(PySpark)。

我的代码看起来像这样

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

def readconfig():
   #code to read a yaml file

def func(filename, tabname):
   sc = SparkContext("local", "First App")
   sqlContext = SQLContext(sc)
   spark = SparkSession.builder.getOrCreate()
   df1= read from file-filename as rdd using sqlcontext
   df2= read from bigquery-tabname as df using spark
   .
   op = intermediate processing
   .
   #caching and unpersisting 2 dfs 
   .
   op.write.csv(write multiple files in gcs bucket)
   sc.stop()
   spark.stop()
   print("one pair of table and file processed")

if __name__ == "__main__":
   config= readconfig()
   for i,j in config.items():
      func(i,j):

由于文件很大,我试图为SparkSession正在处理的每对文件和表创建一个单独的文件。它工作正常,我能够处理大量表格。后来我开始收到关于节点内存问题的警告,最后一个错误说:

节点资源不足。无法创建 SparkSession。

为什么在关闭 a 时会发生这种情况SparkSession应该从以前的迭代中释放数据的内存?

4

1 回答 1

0

因为您将local值传递给构造函数master中的参数,所以您在单个 VM(Dataproc 主节点)上以本地部署模式运行应用程序。这就是为什么您不能在应用程序中处理大量数据的原因。SparkContext

要解决此问题,您应该使用无参数构造函数,该构造函数将从DataprocSparkContext()配置的属性中加载参数 - 在这种情况下,当您将应用程序提交到 Dataproc 集群时,您的应用程序将在 YARN 上运行,并且能够利用所有 Dataproc 集群资源/节点。

此外,您可能希望重构您的应用程序,以便在单个表中对所有表进行数据处理,SparkSession而不是创建每个表SparkSession- 如果操作正确,这应该更有效和可扩展。

于 2020-07-12T04:21:24.587 回答