我正在处理一个用例,我必须处理大量数据(多个表),并且我试图将其作为批处理作业提交给 Dataproc 集群(PySpark)。
我的代码看起来像这样
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
def readconfig():
#code to read a yaml file
def func(filename, tabname):
sc = SparkContext("local", "First App")
sqlContext = SQLContext(sc)
spark = SparkSession.builder.getOrCreate()
df1= read from file-filename as rdd using sqlcontext
df2= read from bigquery-tabname as df using spark
.
op = intermediate processing
.
#caching and unpersisting 2 dfs
.
op.write.csv(write multiple files in gcs bucket)
sc.stop()
spark.stop()
print("one pair of table and file processed")
if __name__ == "__main__":
config= readconfig()
for i,j in config.items():
func(i,j):
由于文件很大,我试图为SparkSession
正在处理的每对文件和表创建一个单独的文件。它工作正常,我能够处理大量表格。后来我开始收到关于节点内存问题的警告,最后一个错误说:
节点资源不足。无法创建 SparkSession。
为什么在关闭 a 时会发生这种情况SparkSession
应该从以前的迭代中释放数据的内存?