1

我正在尝试在 EMR 上使用 PySpark 来分析在 S3 上存储为 SequenceFiles 的一些数据,但由于数据局部性而遇到性能问题。这是一个非常简单但效果不佳的示例:

seqRDD = sc.sequenceFile("s3n://<access>:<secret>@<bucket>/<table>/day=2015-07-04/hour=*/*")
seqRDD.count()

问题在于count动作,它工作正常,但任务分配很差。出于某种原因,在 Spark 日志中,我只看到集群的 2 个 IP 做任何实际工作,而其余 IP 则处于空闲状态。我尝试使用 5 节点集群和 50 节点集群,但日志中始终只有 2 个 IP。

同样很奇怪的是,这两个 IP 的位置是 RACK_LOCAL。我假设这是因为数据在 S3 中,所以它不是本地的,但是我怎样才能让 Spark 使用整个集群而不是仅使用 2 个实例?

我没有为 EMR 上的 Spark 配置做任何特定的事情,只是通过本机应用程序将其安装在 EMR 上,我相信它会自动优化配置。

我在日志中看到了这个,这allowLocal=false可能是一个问题,但我找不到任何东西:


15/07/17 23:55:27 INFO spark.SparkContext: Starting job: count at :1
15/07/17 23:55:27 INFO scheduler.DAGScheduler: Got job 1 (count at :1) with 1354 output partitions (allowLocal=false)
15/07/17 23:55:27 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at :1)

运行时出现的一些日志,count仅显示 2 个 IP:


15/07/17 23:55:28 INFO scheduler.DAGScheduler: Submitting 1354 missing tasks from Stage 1 (PythonRDD[3] at count at :1)
15/07/17 23:55:28 INFO cluster.YarnScheduler: Adding task set 1.0 with 1354 tasks
15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1418 bytes)
15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-172-31-36-179.ec2.internal:39998 (size: 3.7 KB, free: 535.0 MB)
15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-172-31-41-210.ec2.internal:36847 (size: 3.7 KB, free: 535.0 MB)
15/07/17 23:55:29 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-31-41-210.ec2.internal:36847 (size: 18.8 KB, free: 535.0 MB)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1421 bytes)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 3501 ms on ip-172-31-41-210.ec2.internal (1/1354)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 4, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) in 99 ms on ip-172-31-41-210.ec2.internal (2/1354)
15/07/17 23:55:33 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 1.0 (TID 5, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:33 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 5190 ms on ip-172-31-36-179.ec2.internal (3/1354)
15/07/17 23:55:36 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 1.0 (TID 6, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:36 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 1.0 (TID 4) in 4471 ms on ip-172-31-41-210.ec2.internal (4/1354)
15/07/17 23:55:37 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 1.0 (TID 7, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:37 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 5) in 3676 ms on ip-172-31-36-179.ec2.internal (5/1354)
15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 7.0 in stage 1.0 (TID 8, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:40 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 1.0 (TID 6) in 3895 ms on ip-172-31-41-210.ec2.internal (6/1354)
15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 8.0 in stage 1.0 (TID 9, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
4

0 回答 0