我正在运行 Spark 作业来聚合数据。我有一个名为 Profile 的自定义数据结构,它基本上包含一个mutable.HashMap[Zone, Double]
. 我想使用以下代码合并所有共享给定密钥(UUID)的配置文件:
def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
.aggregateByKey(new Profile(), 3200)(merge, merge).cache()
奇怪的是,Spark 失败并出现以下错误:
org.apache.spark.SparkException:作业因阶段故障而中止:116318 个任务的序列化结果的总大小(1024.0 MB)大于 spark.driver.maxResultSize(1024.0 MB)
显而易见的解决方案是增加“spark.driver.maxResultSize”,但有两件事让我感到困惑。
- 我得到的 1024.0 大于 1024.0 太巧合了
- 我在谷歌上搜索此特定错误和配置参数时发现的所有文档和帮助表明它会影响将值返回给驱动程序的函数。(比如说
take()
或collect()
),但我没有向驱动程序带任何东西,只是从 HDFS 读取、聚合、保存回 HDFS。
有谁知道我为什么会收到这个错误?