我正在尝试在处理从谷歌云存储读取的大量数据(2TB)的纱线模式下运行作业。我的管道适用于 10GB 的数据。我的集群的规格和我的管道开始在这里详细说明: PySpark Yarn Application failed on groupBy
这是管道的其余部分:
input.groupByKey()\
[...] processing on sorted groups for each key shard
.mapPartitions(sendPartition)\
.map(mergeShardsbyKey)
.reduceByKey(lambda list1, list2: list1 + list2).take(10)
[...] output
应用于分区的映射函数如下:
def sendPartition(iterator):
pool = external_service_connection_pool()
return [make_request(record, pool) for record in iterator]
def make_request(record, pool):
[...] tags a record based on query results from the external service
return key, taggedrecord
在整个数据集上,执行失败的原因是:
java.lang.OutOfMemoryError: Java heap space
我试图获取更多信息,但我reduceByKey
已经mapPartitions
看到它在在减少之前,作业不会分成多个任务)
我的问题如下:为什么它只在 1 个执行器上运行?即使描述该功能的文档似乎符合我的想法mapPartitions
(http://spark.apache.org/docs/latest/programming-guide.html),这是失败还是应该在此之后工作groupByKey
?
编辑:我尝试使用较小数据集的较小集群,即使成功,也只有一个执行程序用于处理 groupByKey 之后的所有数据。此外,每个阶段之后都有多个分区,当我一个一个启动阶段时,groupByKey阶段在界面上的每个阶段都被标记为“待定”。