假设我有一个数字列表:
val list = List(4,12,3,6,9)
对于列表中的每个元素,我需要找到滚动总和,即 最终输出应该是:
List(4, 16, 19, 25, 34)
是否有任何转换允许我们将列表的两个元素(当前和前一个)作为输入并基于两者进行计算?就像是map(initial)((curr,prev) => curr+prev)
我想在不维护任何共享全局状态的情况下实现这一目标。
编辑:我希望能够对 RDD 进行相同类型的计算。
假设我有一个数字列表:
val list = List(4,12,3,6,9)
对于列表中的每个元素,我需要找到滚动总和,即 最终输出应该是:
List(4, 16, 19, 25, 34)
是否有任何转换允许我们将列表的两个元素(当前和前一个)作为输入并基于两者进行计算?就像是map(initial)((curr,prev) => curr+prev)
我想在不维护任何共享全局状态的情况下实现这一目标。
编辑:我希望能够对 RDD 进行相同类型的计算。
您可以使用scanLeft
list.scanLeft(0)(_ + _).tail
下面的cumSum
方法应该适用于任何具有隐式可用的RDD[N]
,例如, , ,等。N
Numeric[N]
Int
Long
BigInt
Double
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
def cumSum[N : Numeric : ClassTag](rdd: RDD[N]): RDD[N] = {
val num = implicitly[Numeric[N]]
val nPartitions = rdd.partitions.length
val partitionCumSums = rdd.mapPartitionsWithIndex((index, iter) =>
if (index == nPartitions - 1) Iterator.empty
else Iterator.single(iter.foldLeft(num.zero)(num.plus))
).collect
.scanLeft(num.zero)(num.plus)
rdd.mapPartitionsWithIndex((index, iter) =>
if (iter.isEmpty) iter
else {
val start = num.plus(partitionCumSums(index), iter.next)
iter.scanLeft(start)(num.plus)
}
)
}
将这种方法推广到任何具有“零”(即任何幺半群)的关联二元运算符应该是相当简单的。关联性是并行化的关键。如果没有这种关联性,您通常会被困RDD
在以串行方式运行的条目中。
我不知道 spark RDD 支持哪些功能,所以我不确定这是否满足您的条件,因为我不知道是否支持 zipWithIndex(如果答案没有帮助,请通过评论告诉我和我将删除我的答案):
list.zipWithIndex.map{x => list.take(x._2+1).sum}
这段代码对我有用,它总结了元素。它获取列表元素的索引,然后在列表中添加相应的 n 个第一个元素(注意 +1,因为 zipWithIndex 以 0 开头)。
打印时,我得到以下信息:
List(4, 16, 19, 25, 34)