0

假设我有一个数字列表:

val list = List(4,12,3,6,9)

对于列表中的每个元素,我需要找到滚动总和,即 最终输出应该是:

List(4, 16, 19, 25, 34)

是否有任何转换允许我们将列表的两个元素(当前和前一个)作为输入并基于两者进行计算?就像是map(initial)((curr,prev) => curr+prev)

我想在不维护任何共享全局状态的情况下实现这一目标。

编辑:我希望能够对 RDD 进行相同类型的计算。

4

3 回答 3

4

您可以使用scanLeft

list.scanLeft(0)(_ + _).tail
于 2017-06-13T09:35:45.493 回答
1

下面的cumSum方法应该适用于任何具有隐式可用的RDD[N],例如, , ,等。NNumeric[N]IntLongBigIntDouble

import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD

def cumSum[N : Numeric : ClassTag](rdd: RDD[N]): RDD[N] = {
  val num = implicitly[Numeric[N]]
  val nPartitions = rdd.partitions.length

  val partitionCumSums = rdd.mapPartitionsWithIndex((index, iter) => 
    if (index == nPartitions - 1) Iterator.empty
    else Iterator.single(iter.foldLeft(num.zero)(num.plus))
  ).collect
   .scanLeft(num.zero)(num.plus)

  rdd.mapPartitionsWithIndex((index, iter) => 
    if (iter.isEmpty) iter
    else {
      val start = num.plus(partitionCumSums(index), iter.next)
      iter.scanLeft(start)(num.plus)
    }
  )
}

将这种方法推广到任何具有“零”(即任何幺半群)的关联二元运算符应该是相当简单的。关联性是并行化的关键。如果没有这种关联性,您通常会被困RDD在以串行方式运行的条目中。

于 2017-06-15T08:12:50.070 回答
0

我不知道 spark RDD 支持哪些功能,所以我不确定这是否满足您的条件,因为我不知道是否支持 zipWithIndex(如果答案没有帮助,请通过评论告诉我和我将删除我的答案):

list.zipWithIndex.map{x => list.take(x._2+1).sum}

这段代码对我有用,它总结了元素。它获取列表元素的索引,然后在列表中添加相应的 n 个第一个元素(注意 +1,因为 zipWithIndex 以 0 开头)。

打印时,我得到以下信息:

List(4, 16, 19, 25, 34)
于 2017-06-13T10:25:07.277 回答