我会将 a 拆分RDD[(K, V)]
为存储桶,例如输出类型为 a List[(K, RDD[V])]
,这是我的建议。但我不满意,因为它依赖于keysNumber
运行原始 RDD。是否存在其他需要较少运行原始 RDD 的处理方式。如果不是,您如何看待在递归调用之前放入缓存休息的事实,肯定它会更快,但 Spark 会因为与第一个 RDD 的血缘关系而最小化内存中的存储,还是会节省~keysNumber
原始 RDD 的最小版本的时间。谢谢你。
def kindOfGroupByKey[K : ClassTag, V : ClassTag](rdd: RDD[(K, V)], keys: List[K] = List.empty[K]): List[(K, RDD[V])] = {
val keysIn: List[K] = if (keys.isEmpty) rdd.map(_._1).distinct.collect.toList else keys
@annotation.tailrec
def go(rdd2: RDD[(K, V)], keys: List[K], output: List[(K, RDD[V])]): List[(K, RDD[V])] = {
val currentKey :: keyxs = keys
val filtered = rdd2.filter(_._1 == currentKey)
val rest = rdd2.filter(_._1 != currentKey)
val updatedOutput = (currentKey, filtered.map(_._2)) :: output
if (keys.isEmpty) updatedOutput.reverse
// Supposing rdd is cached, it is good to cache rest or does it will generate many smallest cached version of rdd which risk to overload ram ?
else go(rest, keyxs, updatedOutput)
}
go(rdd, keysIn, List.empty[(K, RDD[V])])
}