2

我有三个相同大小的 RDDrdd1包含一个字符串标识符,rdd2包含一个向量并rdd3包含一个整数值。

本质上,我想将这三个压缩在一起以获得 RDD,RDD[String,Vector,Int]但我不断得到不能压缩分区数不相等的 RDD。我怎样才能完全绕过 zip 来做上述事情?

4

3 回答 3

7

尝试:

rdd1.zipWithIndex.map(_.swap).join(rdd2.zipWithIndex.map(_.swap)).values
于 2016-11-03T15:54:39.440 回答
1

在拆分你的原始 RDD 之前,为每一行分配一个唯一的 id RDD.zipWithUniqueId。然后确保将 id 字段包含在您从原始 RDD 中吐出的每个 RDD 中,并将它们用作这些行的键(keyBy如果 id 还不是键,则使用),然后用于RDD.join重新组合行。

一个示例可能如下所示:

val rddWithKey = origionalRdd.zipWithUniqueID().map(_.swap)
val rdd1 = rddWithKey.map{case (key,value) => key -> value.stringField }
val rdd2 = rddWithKey.map{case (key,value) => key -> value.intField }

/*transformations on rdd1 and 2*/

val 重组 = rdd1.join(rdd2)

于 2016-11-03T18:01:42.193 回答
1

它们都具有相同数量的元素吗? zipPartitions用于连接 RDD 在特殊情况下,即它们具有完全相同的分区数并且每个分区中的元素数完全相同。

你的案子没有这样的保证。rdd3在实际上是空的情况下你想做什么?你应该得到一个没有元素的结果 RDD 吗?

编辑:如果您知道长度完全相同,LostInOverflow 的答案将起作用。

于 2016-11-03T15:55:07.370 回答