2

这是我的例子。

val arr = Array((1,2), (1,3), (1,4), (2,3), (4,5))
val data = sc.parallelize(arr, 5)

data.glom.map(_length).collect
Array[Int] = Array(1, 1, 1, 1, 1)

val agg = data.reduceByKey(_+_)
agg.glom.map(_.length).collect
Array[Int] = Array(0, 1, 1, 0, 1)

val fil = agg.filter(_._2 < 4)
fil.glom.map(_.length).collect
Array[Int] = Array(0, 0, 1, 0, 0)

val sub = data.map{case(x,y) => (x, (x,y))}.subtractByKey(fil).map(_._2)
Array[(Int, Int)] = Array((1,4), (1,3), (1,2), (4,5))

sub.glom.map(_.length).collect
Array[Int] = Array(0, 3, 0, 0, 1)

我想知道的是平均分配分区。

data变量由五个分区组成,所有数据均分。

ex)par1: (1,2)
   par2: (1,3)
   par3: (1,4)
   par4: (2,3)
   par5: (4,5)

几次之后transformation operation,分配给sub变量的五个分区中只有两个被使用。

sub变量由五个分区组成,但并非所有数据都被均匀分区。

ex)par1: empty
   par2: (1,2),(1,3),(1,4)
   par3: empty
   par4: empty
   par5: (4,5)

如果我将另一个添加transformation operationsub变量中,将有 5 个可用分区,但只有 2 个分区用于操作。

ex)sub.map{case(x,y) => (x, x, (x,y))}

所以我想在操作数据时利用所有可用的分区。

我用了这个repartition方法,但它并不便宜。

ex) sub.repartition(5).glom.map(_.length).collect
Array[Int] = Array(0, 1, 1, 2, 0)

所以我正在寻找一种明智的方法来利用尽可能多的分区。

有什么好办法吗?

4

1 回答 1

3

所以repartition绝对是要走的路:)

您的示例有点过于简单,无法演示任何内容,因为 Spark 是为处理数十亿行而不是 5 行而构建的。repartition不会将完全相同数量的行放入每个分区,但它会均匀分布数据。尝试用 1.000.000 行重做您的示例,您会看到数据确实在repartition.

在处理大量数据的转换时,数据倾斜通常是一个大问题,并且重新分区数据确实会带来额外的时间成本,因为它需要对数据进行洗牌。但有时值得接受惩罚,因为它会使接下来的转换阶段运行得更快。

于 2017-03-27T11:09:25.527 回答