0

我正在尝试提高我的 Spark 应用程序代码对“收集”的理解,并且我正在处理以下代码:

  val triple = logData.map(x => x.split('@'))
                  .map(x => (x(1),x(0),x(2)))
                  .collect()
                  .sortBy(x => (x._1,x._2))
  val idx = sc.parallelize(triple)

基本上我正在创建一个 [String,String,String] RDD,其中包含一个不必要的(恕我直言)收集/并行化步骤(原始 RDD 中的 200k 个元素)。

Spark 指南说:“收集:在驱动程序中将数据集的所有元素作为数组返回。这通常在过滤或其他返回足够小的数据子集的操作之后很有用。”

顺便说一句:200k足够小吗?

我觉得这段代码应该“更轻”(没有收集并行):

  val triple = logData.map(x => x.split('@'))
                  .map(x => (x(1),x(0),x(2)))
                  .sortBy(x => (x._1,x._2))
  val idx = triple

但是在多次运行(本地未分发)同一个应用程序之后,我总是使用第一个代码获得更快的时间,在我看来这是一项额外的工作(首先收集然后并行化)。

整个应用程序(不仅仅是这个代码片段)在第一种情况下平均需要 48 秒,在第二种情况下至少需要 52 秒。

这怎么可能?

提前致谢

4

1 回答 1

1

我认为这是因为数据集太小了,在后一种情况下,您遭受了 shuffle 的调度来进行排序,这在本地操作时可能会更快。当您的数据集增长时,甚至可能无法收集到驱动程序中。

于 2015-05-12T16:31:01.237 回答