1

为了减少两个 RDD 加入过程中的洗牌,我决定先使用 HashPartitioner 对它们进行分区。这是我的做法。我做得对吗,还是有更好的方法来做到这一点?

val rddA = ...
val rddB = ...

val numOfPartitions = rddA.getNumPartitions

val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions))
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions))

val rddAB = rddApartitioned.join(rddBpartitioned)
4

2 回答 2

5

为了减少两个 RDD 连接过程中的洗牌,

令人惊讶的是,重新分区会减少甚至消除洗牌,这是一种普遍的误解。它没有。重新分区是最纯粹的洗牌。它不会节省时间、带宽或内存。

使用主动分区器背后的基本原理是不同的 - 它允许您洗牌一次,并重用状态,以执行多个按键操作,而无需额外的洗牌(尽管据我所知,不一定没有额外的网络流量,因为 co -partitioning 并不意味着 co-location,不包括在单个操作中发生洗牌的情况)。

所以你的代码是正确的,但在你加入的情况下,它不会给你带来任何东西。

于 2019-03-21T12:13:57.077 回答
0

只需一条评论,如果 and 有多个操作,最好在后面附加,否则.persist(),所有操作都会评估and的整个沿袭,这将导致哈希分区一次又一次地发生。.partitionByrddApartitionedrddBpartitionedrddApartitionedrddBpartitioned

val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions)).persist()
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions)).persist()
于 2020-08-09T11:35:04.877 回答