1

我正在尝试在处理从谷歌云存储读取的大量数据(2TB)的纱线模式下运行作业。我的管道适用于 10GB 的数据。我的集群的规格和我的管道开始在这里详细说明: PySpark Yarn Application failed on groupBy

这是管道的其余部分:

      input.groupByKey()\
      [...] processing on sorted groups for each key shard
      .mapPartitions(sendPartition)\
      .map(mergeShardsbyKey)
      .reduceByKey(lambda list1, list2: list1 + list2).take(10)
      [...] output

应用于分区的映射函数如下:

def sendPartition(iterator):
    pool = external_service_connection_pool()
    return [make_request(record, pool) for record in iterator]

def make_request(record, pool):
    [...] tags a record based on query results from the external service
    return key, taggedrecord

在整个数据集上,执行失败的原因是:

java.lang.OutOfMemoryError: Java heap space

我试图获取更多信息,但我reduceByKey已经mapPartitions看到在在减少之前,作业不会分成多个任务)

我的问题如下:为什么它只在 1 个执行器上运行?即使描述该功能的文档似乎符合我的想法mapPartitionshttp://spark.apache.org/docs/latest/programming-guide.html),这是失败还是应该在此之后工作groupByKey

编辑:我尝试使用较小数据集的较小集群,即使成功,也只有一个执行程序用于处理 groupByKey 之后的所有数据。此外,每个阶段之后都有多个分区,当我一个一个启动阶段时,groupByKey阶段在界面上的每个阶段都被标记为“待定”。

4

0 回答 0