1

我正在尝试删除 RDD [String] 的最后一个元素。

到目前为止,我正在这样做:

val n: Long = rdd.count()
val startIndex: Long = n - 1

val lastElem = rdd.zipWithIndex()
  .filter{ case (_, index) => index >= startIndex }
  .keys
  .collect()

val newRdd = rdd.filter(x => !x.equalsIgnoreCase(lastElem(0))).cache()

也就是说,取rdd的最后一个元素,并过滤它以获得所有元素减去最后一个元素。

这工作得很好,但有没有更好的方法呢?

4

2 回答 2

0

initscala中有一个函数可以为您提供除了集合中的最后一个元素之外的所有元素。你可以利用那个

val newRdd = sc.parallelize(rdd.collect().toList.init)

这应该通过删除最后一个元素为您提供新的 rdd,并且比您的方法更好,因为 collect 只使用一次。

而且 rdd 是分布式的,如果不将它收集到一个节点,就无法判断哪一个是最后一个字符串。

这里我已经收集到驱动节点了您可以使用另一种技术收集到一个执行者并使用init函数

于 2018-05-19T01:55:22.427 回答
0

假设顺序是明确定义的(上游没有广泛的转换,并且输入源保证了明确定义的元素顺序),您当前的解决方案是最好的。

特别是你应该避免不可扩展和整体无用(如果 RDD 中的值顺序没有很好地定义,那么在 中的顺序值collected Array也没有很好地定义。)collect

使用前请务必了解限制。引用文档

请注意,某些 RDD,例如那些由 groupBy() 返回的 RDD,不保证分区中元素的顺序。因此,不能保证分配给每个元素的唯一 ID,如果重新评估 RDD,甚至可能会更改。如果需要固定的排序来保证相同的索引分配,则应该使用 sortByKey() 对 RDD 进行排序或将其保存到文件中。

于 2019-02-04T21:39:09.050 回答