1

我一直在对在 Dataproc 集群上运行的 pySpark 作业进行基准测试,并注意到处理时间的顽固“地板”——尽管有几种不同的集群配置。我想知道这是否是由于 gs: storage 和 Dataproc 之间的文件 xfer 延迟?

源文件为 60G,存储在与我的 dataproc 集群相同的区域(us-central1)下的同一项目下的存储桶中。该文件有 14.3 亿行,有 7.31 亿条记录,每条记录 17 个字段,全部在一行中。除了标题行之外,额外的行都是空白的。

在 1 个主设备、4 个工作人员的配置中,所有设备都是带有 300GB 磁盘的 n-standard-8 机器,运行时间是 35:20 和 36:33。当我将集群加强到 8 个工作人员而不是 4 个(仍然都是 n-standard-8)时,它下降到 21:14。接下来,我将 wkrs 更改为 n-highmem-32s 的 4 个,同时将 mstr 保持在 n-standard-8,时钟在 20:01 最后,我真的加强了,切换到 1 mstr 和 16 wkrs,所有 n -highmem-32。这次跑步的最佳时间是 15:36

这是我所有测试/配置等的结果:

测试结果

我运行了其他测试,对脚本中的缓存稍作更改,但没有一个比上述更好。

这让我认为 60G 文件的初始 xfer 是一个主要因素。您希望这样的 xfer 需要多长时间 - 考虑到它都在 GCP 内,在同一个项目下,在同一个地区?需要10分钟吗?

如果答案在这里,我还包括 pySpark 脚本:

在此处输入图像描述

4

1 回答 1

0

一般来说,只要分区得到合理细粒度的计算,文件大小就不应该存在瓶颈。底层InputFormat预计会将 60GB 文件分割成大量较小的字节范围,大小约为 64MB,每个字节范围将由不同的工作任务独立读取。您应该检查Spark Web 界面以可视化正在运行的任务数量、它们的时间、每个任务的最小/最大时间、每个工作人员处理的字节量等,以查看数据拆分中是否存在一些偏差或其他原因一些工作人员处于空闲状态,不参与文件的读取。

如果由于某种原因分区的数量很少,您可以尝试调整minPartitions调用SparkContext.textFile(path, minPartitions)的参数。

于 2016-03-24T18:04:59.247 回答