我是 Apache Spark 的新手,我知道核心数据结构是 RDD。现在我正在编写一些需要元素位置信息的应用程序。例如,将 ArrayList 转换为(Java)RDD 后,对于 RDD 中的每个整数,我需要知道它的(全局)数组下标。有可能做到吗?
据我所知,RDD 有一个take(int)函数,所以我相信位置信息仍然保留在 RDD 中。
我是 Apache Spark 的新手,我知道核心数据结构是 RDD。现在我正在编写一些需要元素位置信息的应用程序。例如,将 ArrayList 转换为(Java)RDD 后,对于 RDD 中的每个整数,我需要知道它的(全局)数组下标。有可能做到吗?
据我所知,RDD 有一个take(int)函数,所以我相信位置信息仍然保留在 RDD 中。
我相信在大多数情况下, zipWithIndex() 可以解决问题,并且会保留顺序。再次阅读评论。我的理解是,这完全意味着将订单保留在 RDD 中。
scala> val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3)
scala> val r2 = r1.zipWithIndex
scala> r2.foreach(println)
(c,2)
(d,3)
(e,4)
(f,5)
(g,6)
(a,0)
(b,1)
上面的例子证实了这一点。红色有 3 个分区,a 有索引 0,b 有索引 1,等等。
从本质上讲,RDD 的 zipWithIndex() 方法似乎可以做到这一点,但它不会保留创建 RDD 的数据的原始顺序。至少你会得到一个稳定的订单。
val orig: RDD[String] = ...
val indexed: RDD[(String, Long)] = orig.zipWithIndex()
您不太可能找到保留原始数据顺序的内容的原因隐藏在 zipWithIndex() 的 API 文档中:
“用它的元素索引压缩这个RDD。排序首先基于分区索引,然后是每个分区中项目的排序。所以第一个分区中的第一个项目获得索引0,最后一个分区中的最后一个项目接收最大索引。这类似于Scala的zipWithIndex,但它使用Long而不是Int作为索引类型。当这个RDD包含多个分区时,这种方法需要触发一个spark作业。
所以看起来原来的订单被丢弃了。如果保留原始顺序对您很重要,那么您似乎需要在创建 RDD之前添加索引。