3

如何有效地将 a 拆分RDD[T]为带有元素的Seq[RDD[T]]/并保留原始顺序?Iterable[RDD[T]]n

我希望能够写出这样的东西

RDD(1, 2, 3, 4, 5, 6, 7, 8, 9).split(3)

这应该会导致类似

Seq(RDD(1, 2, 3), RDD(4, 5, 6), RDD(7, 8, 9))

spark提供这样的功能吗?如果不是,那么实现这一目标的有效方法是什么?

val parts = rdd.length / n
val rdds = rdd.zipWithIndex().map{ case (t, i) => (i - (i % parts), t)}.groupByKey().values.map(iter => sc.parallelize(iter.toSeq)).collect

看起来不是很快。。

4

1 回答 1

0

从技术上讲,您可以按照您的建议进行操作。但是,在利用计算集群来执行大数据的分布式处理的情况下,它确实没有意义。它首先违背了 Spark 的全部观点。如果您执行 groupByKey 然后尝试将它们提取到单独的 RDD 中,您实际上是将 RDD 中分布的所有数据拉到驱动程序上,然后将每个数据重新分配回集群。如果驱动程序无法加载整个数据文件,它也将无法执行此操作。

您不应将大型数据文件从本地文件系统加载到驱动程序节点。您应该将文件移动到 HDFS 或 S3 等分布式文件系统上。然后,您可以通过val lines = SparkContext.textFile(...)RDD 行将单个大数据文件加载到集群中。当你这样做时,集群中的每个工作人员将只加载文件的一部分,这是可以做到的,因为数据已经在分布式文件系统中分布在集群中。

如果您随后需要将数据组织成对数据的功能处理很重要的“批次”,您可以使用适当的批次标识符来键入数据,例如:val batches = lines.keyBy( line => lineBatchID(line) )

然后可以将每个批次归结为批次级别的摘要,这些摘要可以归结为单个整体结果。

为了测试 Spark 代码,可以将数据文件的样本加载到单台机器上。但是当涉及到完整的数据集时,您应该利用分布式文件系统和 Spark 集群来处理这些数据。

于 2017-03-15T17:36:31.873 回答