8

使用 Scala 标准库时,我可以这样做:

scala> val scalaList = List(1,2,3)
scalaList: List[Int] = List(1, 2, 3)

scala> scalaList.foldLeft(0)((acc,n)=>acc+n)
res0: Int = 6

从许多 Int 中制作一个 Int。

我可以做这样的事情:

scala> scalaList.foldLeft("")((acc,n)=>acc+n.toString)
res1: String = 123

从许多 Int 中制作一个 String。

因此, foldLeft 可以是同质的或异构的,无论我们想要什么,它都在一个 API 中。

在 Spark 中,如果我想要许多 Int 中的一个 Int,我可以这样做:

scala> val rdd = sc.parallelize(List(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12
scala> rdd.fold(0)((acc,n)=>acc+n)
res1: Int = 6

fold API 类似于 foldLeft,但它只是同构的,一个 RDD[Int] 只能产生带有 fold 的 Int。

spark中也有一个聚合API:

scala> rdd.aggregate("")((acc,n)=>acc+n.toString, (s1,s2)=>s1+s2)
res11: String = 132

它是异构的,一个 RDD[Int] 现在可以产生一个字符串。

那么,为什么在 Spark 中 fold 和 aggregate 实现为两个不同的 API?

为什么它们不像 foldLeft 那样设计,既可以是同质的又可以是异质的?

(我对 Spark 很陌生,如果这是一个愚蠢的问题,请原谅。)

4

3 回答 3

4

fold可以更有效地实现,因为它不依赖于固定的评估顺序。所以每个集群节点可以fold并行自己的块,然后fold在最后一个小的整体。而foldLeft每个元素都必须按顺序折叠,并且不能并行完成。

(为了方便起见,为常见情况提供一个更简单的 API 也很好。标准库也有reduce这个foldLeft原因)

于 2014-10-29T16:36:18.600 回答
2

特别是在 Spark 中,计算是分布式并行完成的,因此foldLeft无法像在标准库中那样实现。fold相反,聚合需要两个函数,一个对 type 的每个元素执行类似的操作T,产生一个 type 的值U,另一个将U来自每个分区的值组合成最终值:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
于 2014-10-29T17:06:04.593 回答
1

foldLeft, foldRight, reduceLeft, reduceRight, scanLeftscanRight是累积参数可以与输入参数不同的操作 ( (A, B) -> B) 并且这些操作只能按顺序执行。

fold是一种运算,其中累积的参数必须与输入参数 ( (A, A) -> A) 的类型相同。然后可以并行执行。

aggregation是一种操作,其中累积的参数可以与输入参数的类型不同,但是您必须提供一个附加函数来定义如何在最终结果中聚合累积的参数。此操作允许并行执行。该aggregation操作是foldLeft和的组合fold

有关更多详细信息,您可以查看“并行编程”课程的课程视频:

于 2017-07-09T17:44:36.757 回答