1

从日志中我可以看到有 182k 行 70MB。在 Dataproc 上训练 182K 行需要 1.5 小时加载 70MB 数据和 9 小时(从 15/11/14 01:58:28 开始,到 15/11/14 09:19:09 结束)。在我的本地机器上加载相同的数据并运行相同的算法需要 3 分钟

数据处理日志

15/11/13 23:27:09 INFO com.google.cloud.hadoop.io.bigquery.ShardedExportToCloudStorage: Table 'mydata-data:website_wtw_feed.video_click20151111' to be exported has 182712 rows and 70281790 bytes
15/11/13 23:28:13 WARN akka.remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@rc-spark-poc-w-1.c.dailymotion-data.internal:60749] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 

15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Fetching the Ratings RDD
15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the video feature matrix
15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Training ALS Matrix factorization Model


[Stage 2:=============================>                             (1 + 1) / 2]

15/11/14 09:19:09 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
15/11/14 09:19:09 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS

15/11/14 09:19:44 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the video feature matrix
15/11/14 09:19:44 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the user feature matrix
  1. 将数据复制到本地机器

    r.viswanadha$ gsutil cp -r gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000 .
    
    
    Copying gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json... 
    
    Downloading ...201511132327_0000/shard-0/data-000000000000.json: 141.3 MiB/141.3 MiB      
    
    Copying gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000001.json... 
    
    Copying gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/data-000000000000.json...`
    
  2. 运行相同的算法。ALS 训练步骤耗时约 3 分钟

    com.dailymotion.recommender.BigQueryRecommender --app_name BigQueryRecommenderTest --master local[4] --input_dir /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/input/job_201511132327_0000/shard-0/ 
    

第一次运行

15/11/14 13:19:36 INFO BigQueryRecommender: Training implicit features for the ALS Matrix factorization Model
...
15/11/14 13:22:24 INFO BigQueryRecommender: Transforming the video feature matrix

第二轮

15/11/14 13:29:05 INFO BigQueryRecommender: Training implicit features for the ALS Matrix factorization Model


...

15/11/14 13:31:57 INFO BigQueryRecommender: Transforming the video feature matrix

DataProc 集群有 1 个 Master 和 3 个 Slave,每个具有 104GB (RAM) 和 16 个 CPU。

我的本地机器有 8GB (RAM) 和 2 个 CPU 2.7GHz Core i5

gsutil ls -l -r -h  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/: 

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/: 

    0 B  2015-11-13T23:27:13Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/ 

    141.3 MiB  2015-11-13T23:29:21Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json 

   0 B  2015-11-13T23:29:21Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000001.json 

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/: 

    0 B  2015-11-13T23:27:13Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/ 

    0 B  2015-11-13T23:28:47Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/data-000000000000.json 

   0 B  2015-11-13T23:27:09Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/ 

TOTAL: 6 objects, 148165416 bytes (141.3 MiB)
4

2 回答 2

3

回顾一些离线发现,当分布式集群上的运行速度比本地设置慢几个数量级时,要寻找的主要瓶颈是跨网络服务方面的 I/O 往返延迟瓶颈依赖关系以及磁盘和本地 I/O。

一般要寻找的东西(其中一些可能适用于您的特定情况,也可能不适用于您的特定情况,但对于遇到类似问题的其他人可能很常见):

  1. 确保保存数据的 GCS 存储桶与您部署 Dataproc 集群的 GCE 区域位于同一区域gsutil ls -L gs://[your-bucket]。跨大陆的流量不仅明显变慢,而且可能会在您的项目中产生额外的网络成本。
  2. 如果您的工作有任何其他网络依赖项,例如查询 API 或在 GCE 上运行的某种单独的数据库,请尝试将它们放在同一个区域中;即使在同一个大陆,GCE 跨区域流量也可能有数十毫秒的往返延迟,这可能会显着增加,尤其是在发出每条记录请求的情况下(例如,30 毫秒 * 18 万条记录需要 1.5 小时) )。
  3. 尽管这一次可能不适用于您的特定情况,但请记住尽可能避免通过 Hadoop 文件系统接口到 GCS 的每条记录往返 I/O;GCS 的整体吞吐量非常可扩展,但由于远程存储的性质,往返延迟比您可能在本地机器上测量的往返延迟要慢得多,因为本地读取经常会命中 OS 缓冲区缓存,或者如果您正在使用带有 SSD 的笔记本电脑能够维持大量的亚毫秒往返,而与 GCS 的往返为 30 毫秒至 100 毫秒。

一般来说,对于可以支持非常高的吞吐量但会遭受长往返延迟的用例,如果数据很小并且不能自然地划分为足够的并行度,请确保使用repartition()之类的方法将数据分片确保充分利用 Spark 集群。

最后,我们最新的Dataproc 版本修复了一堆原生库配置,因此它可能会在 ALS 部分以及其他 mllib 用例中显示出更好的性能。

于 2015-11-19T23:01:28.857 回答
2

对于遇到类似情况的任何人:当只处理 GCS 中的单个小对象(或包含来自 BigQuery 连接器的数据的单个分片)时,您最终可能会在 Spark RDD 中得到一个分区,结果是很少或没有并行性。

虽然它会导致额外的 shuffle 阶段,但输入 RDD 可以在从 GCS 或 BigQuery 读取后立即重新分区,以获得所需的分区数量。额外的 shuffle 是否有益取决于 RDD 中的每条记录需要多少处理或 IO。

于 2015-11-19T22:59:47.953 回答