5

我有 RDD,其中每条记录都是 int:

[0,1,2,3,4,5,6,7,8]

我需要做的就是将这个 RDD 分成多个批次。即制作另一个 RDD,其中每个元素都是固定大小的元素列表:

[[0,1,2], [3,4,5], [6,7,8]]

这听起来微不足道,但是,我在过去几天感到困惑,除了以下解决方案之外找不到任何东西:

  1. 使用 ZipWithIndex 枚举 RDD 中的记录:

    [0,1,2,3,4,5] -> [(0, 0),(1, 1),(2, 2),(3, 3),(4, 4),(5, 5)]

  2. 使用 map() 迭代这个 RDD 并计算索引index = int(index / batchSize)

    [1,2,3,4,5,6] -> [(0, 0),(0, 1),(0, 2),(1, 3),(1, 4),(1, 5)]

  3. 然后按生成的索引分组。

    [(0, [0,1,2]), (1, [3,4,5])]

这会给我我需要的东西,但是,我不想在这里使用 group by。当您使用普通的 Map Reduce 或 Apache Crunch 之类的抽象时,这很简单。但是有没有办法在 Spark 中产生类似的结果而不使用大量的 group by?

4

2 回答 2

0

您没有清楚地解释为什么需要固定大小的 RDD,这取决于您要完成的工作可能会有更好的解决方案,但要回答已提出的问题,我看到以下选项:
1)基于项目数和批量大小。例如,如果您在原始 RDD 中有 1000 个项目并希望将它们分成 10 个批次,您最终将应用 10 个过滤器,第一个检查索引是否为 [0, 99],第二个检查索引是否为 [100, 199]等等。应用每个过滤器后,您将拥有一个 RDD。需要注意的是,原始 RDD 可能会在过滤之前被缓存。优点:每个生成的 RDD 可以单独处理,不必在一个节点上完全分配。缺点:随着批次数量的增加,这种方法变得更慢。
2) 逻辑上与此类似,但不是过滤器,您只需实现一个自定义分区器,该分区器根据索引(键)返回分区 id,如下所述:自定义分区器用于同等大小的分区。优点:比过滤器快。缺点:每个分区必须适合一个节点。
3)如果原始RDD中的顺序并不重要,只需要大致相同的分块,你可以合并/重新分区,这里解释https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd -partitions.html

于 2017-11-13T16:27:05.827 回答
0

也许你可以使用aggregateByKey ,在这种情况下它比groupByKey更快更轻量级。我尝试在 10 个执行器上将 5 亿条数据拆分为 256 个大小的批次,只需要半个小时就可以完成。

data = data.zipWithIndex().map(lambda x: (x[1] / 256, x[0]))
data = data.aggregateByKey(list(), lambda x, y: x + [y], add)

有关更多信息,请参阅 reduceByKey vs groupByKey vs aggregateByKey vs combineByKey 之间的 Spark 差异

于 2019-08-16T03:49:53.767 回答