我是 spark 新手,没有 Java 编程经验。我正在使用 pyspark 处理一个非常大的时间序列数据集,其中包含接近 4000 个数字(浮点)列和数十亿行。
我想用这个数据集实现以下目标:
时间序列数据以 10 毫秒为间隔。我想按 1s 间隔对数据进行分组,并使用均值作为聚合函数。
这是我用来读取分区镶木地板文件的代码。
df = (spark.read.option("mergeSchema", "true")
.parquet("/data/"))
这是我编写的 groupby 和聚合代码:
col_list = [... list of numeric columns in the dataframe ...]
agg_funcs = [mean] # I also want to add other aggregation functions here later.
exprs = [f(df[c]).alias(f.__name__ + '_' + c) for f in agg_funcs for c in col_list]
result = (df.groupBy(['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'])
.agg(*exprs))
现在,我想将上述结果数据帧写入分区镶木地板:
(result.write.mode("overwrite")
.partitionBy('Year', 'Month', 'Day', 'Hour', 'Minute', 'Second')
.parquet('/out/'))
但是,我得到一个 java 堆内存不足错误。
我尝试增加 spark.sql.shuffle.partitions
以使每个分区的大小更小,但这没有帮助。
我的火花集群配置:
2 workers + 1 master
Both the worker nodes have 256 GB RAM and 32 cores each.
Master node has 8 cores and 32 GB RAM.
我为我的 spark 作业指定的配置是:
{
"driverMemory": "8G",
"driverCores": 4,
"executorMemory": "20G",
"executorCores": 4,
"numExecutors": 14,
"conf": {
"spark.sql.shuffle.partitions": 2000000
}
}
以下是 Ambari 关于集群配置的一些截图:
有人可以帮我理解为什么会出现内存问题以及如何解决吗?谢谢。