14

我已经在 Spark 的独立模式下使用 ec2-script 启动了一个 10 节点集群。我正在从 PySpark shell 中访问 s3 存储桶中的数据,但是当我在 RDD 上执行转换时,只使用了一个节点。例如,下面将从 CommonCorpus 中读取数据:

bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/"
          "/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10"
          "-180-212-248.ec2.internal.warc.gz")

data = sc.textFile(bucket)
data.count()

当我运行它时,我的 10 个从站中只有一个处理数据。我知道这一点,因为从 Spark Web 控制台查看时,只有一个从站 (213) 有任何活动日志。当我查看 Ganglia 中的活动时,这个相同的节点 (213) 是唯一一个在活动运行时内存使用量激增的从站。在此处输入图像描述

此外,当我使用只有一个从属设备的 ec2 集群运行相同的脚本时,我具有完全相同的性能。我正在使用 Spark 1.1.0,非常感谢任何帮助或建议。

4

1 回答 1

18

...ec2.internal.warc.gz

我认为您遇到了 gzip 压缩文件的一个相当典型的问题,即它们无法并行加载。更具体地说,单个 gzip 压缩文件不能由多个任务并行加载,因此 Spark 将使用 1 个任务加载它,从而为您提供一个具有 1 个分区的 RDD。

(但请注意,Spark 可以并行加载 10 个 gzip 压缩文件;只是这 10 个文件中的每一个只能由 1 个任务加载。您仍然可以获得文件的并行性,而不是一个文件中。)

您可以通过明确检查 RDD 中的分区数来确认您只有 1 个分区:

data.getNumPartitions()

RDD 上可以并行运行的任务数的上限是 RDD 中的分区数或集群中的从属内核数,以较低者为准。

在您的情况下,它是 RDD 分区的数量。您可以通过重新分区您的 RDD 来增加它,如下所示:

data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3)

为什么sc.defaultParallelism * 3

Spark Tuning 指南建议每个核心有 2-3 个任务,并sc.defaultParalellism为您提供集群中的核心数量。

于 2014-12-24T04:58:20.450 回答