49

我有两个想要加入的 RDD,它们看起来像这样:

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]

碰巧 的键值rdd1是唯一的,而 的元组键值rdd2也是唯一的。我想加入这两个数据集,以便得到以下 rdd:

val rdd_joined:RDD[((T,W), (U,V))]

实现这一目标的最有效方法是什么?以下是我想到的一些想法。

选项1:

val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})

选项 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)

选项 1 将收集所有数据以掌握,对吗?因此,如果 rdd1 很大(在我的情况下它相对较大,尽管比 rdd2 小一个数量级),这似乎不是一个好的选择。选项 2 做了一个丑陋的独特的笛卡尔积,这似乎也非常低效。我想到的另一种可能性(但尚未尝试)是执行选项 1 并广播地图,尽管以“智能”方式广播会更好,以便地图的键与的键rdd2

有没有人遇到过这种情况?我很高兴有你的想法。

谢谢!

4

2 回答 2

59

rdd1一种选择是通过收集到驱动程序并将其广播给所有映射器来执行广播连接;正确完成,这将使我们避免对大型rdd2RDD 进行昂贵的洗牌:

val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))

val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
  val m = rdd1Broadcast.value
  for {
    ((t, w), u) <- iter
    if m.contains(t)
  } yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)

告诉 Spark这个preservesPartitioning = truemap 函数不会修改 ; 的键rdd2rdd2这将允许 Spark 避免为任何基于(t, w)密钥连接的后续操作重新分区。

此广播可能效率低下,因为它涉及驱动程序的通信瓶颈。原则上,可以在不涉及驱动程序的情况下将一个 RDD 广播到另一个 RDD;我有一个原型,我想对其进行概括并添加到 Spark。

另一种选择是重新映射rdd2并使用 Sparkjoin方法的键;这将涉及rdd2(并且可能rdd1)的全面洗牌:

rdd1.join(rdd2.map {
  case ((t, w), u) => (t, (w, u))
}).map {
  case (t, (v, (w, u))) => ((t, w), (u, v))
}.collect()

在我的示例输入中,这两种方法都会产生相同的结果:

res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))

第三种选择是重组rdd2t使其成为关键,然后执行上述连接。

于 2013-07-17T02:48:39.547 回答
14

另一种方法是创建一个自定义分区器,然后使用 zipPartitions 加入您的 RDD。

import org.apache.spark.HashPartitioner

class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {

  override def getPartition(key: Any): Int = key match {
    case k: Tuple2[Int, String] => super.getPartition(k._1)
    case _ => super.getPartition(key)
  }

}

val numSplits = 8
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))

val result = rdd2.zipPartitions(rdd1)(
  (iter2, iter1) => {
    val m = iter1.toMap
    for {
        ((t: Int, w), u) <- iter2
        if m.contains(t)
      } yield ((t, w), (u, m.get(t).get))
  }
).partitionBy(new HashPartitioner(numSplits))

result.glom.collect
于 2014-04-18T00:25:56.190 回答