12

我是 Apache Spark 的新手,我知道核心数据结构是 RDD。现在我正在编写一些需要元素位置信息的应用程序。例如,将 ArrayList 转换为(Java)RDD 后,对于 RDD 中的每个整数,我需要知道它的(全局)数组下标。有可能做到吗?

据我所知,RDD 有一个take(int)函数,所以我相信位置信息仍然保留在 RDD 中。

4

2 回答 2

16

我相信在大多数情况下, zipWithIndex() 可以解决问题,并且会保留顺序。再次阅读评论。我的理解是,这完全意味着将订单保留在 RDD 中。

scala> val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3)
scala> val r2 = r1.zipWithIndex
scala> r2.foreach(println)
(c,2)
(d,3)
(e,4)
(f,5)
(g,6)
(a,0)
(b,1)

上面的例子证实了这一点。红色有 3 个分区,a 有索引 0,b 有索引 1,等等。

于 2014-09-28T04:13:35.277 回答
11

从本质上讲,RDD 的 zipWithIndex() 方法似乎可以做到这一点,但它不会保留创建 RDD 的数据的原始顺序。至少你会得到一个稳定的订单。

val orig: RDD[String] = ...
val indexed: RDD[(String, Long)] = orig.zipWithIndex()

您不太可能找到保留原始数据顺序的内容的原因隐藏在 zipWithIndex() 的 API 文档中:

“用它的元素索引压缩这个RDD。排序首先基于分区索引,然后是每个分区中项目的排序。所以第一个分区中的第一个项目获得索引0,最后一个分区中的最后一个项目接收最大索引。这类似于Scala的zipWithIndex,但它使用Long而不是Int作为索引类型。当这个RDD包含多个分区时,这种方法需要触发一个spark作业。

所以看起来原来的订单被丢弃了。如果保留原始顺序对您很重要,那么您似乎需要在创建 RDD之前添加索引。

于 2014-09-25T20:53:33.303 回答