我有三个相同大小的 RDDrdd1
包含一个字符串标识符,rdd2
包含一个向量并rdd3
包含一个整数值。
本质上,我想将这三个压缩在一起以获得 RDD,RDD[String,Vector,Int]
但我不断得到不能压缩分区数不相等的 RDD。我怎样才能完全绕过 zip 来做上述事情?
我有三个相同大小的 RDDrdd1
包含一个字符串标识符,rdd2
包含一个向量并rdd3
包含一个整数值。
本质上,我想将这三个压缩在一起以获得 RDD,RDD[String,Vector,Int]
但我不断得到不能压缩分区数不相等的 RDD。我怎样才能完全绕过 zip 来做上述事情?
尝试:
rdd1.zipWithIndex.map(_.swap).join(rdd2.zipWithIndex.map(_.swap)).values
在拆分你的原始 RDD 之前,为每一行分配一个唯一的 id RDD.zipWithUniqueId
。然后确保将 id 字段包含在您从原始 RDD 中吐出的每个 RDD 中,并将它们用作这些行的键(keyBy
如果 id 还不是键,则使用),然后用于RDD.join
重新组合行。
一个示例可能如下所示:
val rddWithKey = origionalRdd.zipWithUniqueID().map(_.swap)
val rdd1 = rddWithKey.map{case (key,value) => key -> value.stringField }
val rdd2 = rddWithKey.map{case (key,value) => key -> value.intField }
/*transformations on rdd1 and 2*/
val 重组 = rdd1.join(rdd2)
它们都具有相同数量的元素吗? zipPartitions
用于连接 RDD 在特殊情况下,即它们具有完全相同的分区数并且每个分区中的元素数完全相同。
你的案子没有这样的保证。rdd3
在实际上是空的情况下你想做什么?你应该得到一个没有元素的结果 RDD 吗?
编辑:如果您知道长度完全相同,LostInOverflow 的答案将起作用。