97

为什么 Scala 和 Spark 和 Scalding 等框架同时具有reducefoldLeft?那么reduce和 和有什么区别fold呢?

4

4 回答 4

268

减少 vs foldLeft

与该主题相关的任何其他stackoverflow答案中都没有明确提到的一个很大的区别是,reduce应该给它一个可交换的幺半群,即一个既可交换又可关联的操作。这意味着操作可以并行化。

reduce这种区别对于大数据/MPP/分布式计算非常重要,甚至存在的全部原因。集合可以被切分,reducecan 对每个块进行操作,然后reducecan 对每个块的结果进行操作——事实上,分块的级别不需要停止一层。我们也可以切碎每一块。这就是为什么如果给定无限数量的 CPU,对列表中的整数求和是 O(log N)。

如果您只查看签名,则没有reduce存在的理由,因为您可以reduce使用foldLeft. 的功能foldLeft大于 的功能reduce

但是你不能并行化 a foldLeft,所以它的运行时间总是 O(N) (即使你输入一个可交换的幺半群)。这是因为假设操作不是可交换的幺半群,因此累积值将通过一系列顺序聚合计算。

foldLeft不假定交换性或结合性。关联性提供了拆分集合的能力,而交换性使累积变得容易,因为顺序并不重要(因此聚合每个块的每个结果的顺序无关紧要)。严格来说,并行化不需要交换性,例如分布式排序算法,它只是使逻辑更容易,因为你不需要给你的块排序。

如果您查看 Spark 文档,reduce它专门说“... commutative and associative binary operator”

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

这是证明reduce不仅仅是一个特例foldLeft

scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par

scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds

scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds

减少与折叠

现在这是它更接近 FP / 数学根源的地方,并且解释起来有点棘手。Reduce 被正式定义为 MapReduce 范式的一部分,它处理无序集合(多集),Fold 被正式定义为递归(参见 catamorphism),因此假定集合的结构/序列。

Scalding没有fold方法,因为在(严格的)Map Reduce 编程模型下我们无法定义fold,因为块没有排序,fold只需要关联性,而不是交换性。

简而言之,reduce没有累积顺序的工作,fold需要累积的顺序,并且需要零值的累积顺序而不是区分它们的零值的存在。严格来说reduce 应该适用于空集合,因为它的零值可以通过取任意值x然后求解来推断x op y = x,但这不适用于非交换运算,因为可以存在不同的左右零值(即x op y != y op x)。当然 Scala 不会费心去计算这个零值是什么,因为这需要做一些数学运算(这可能是不可计算的),所以只是抛出一个异常。

似乎(在词源学中经常出现这种情况)这个原始的数学含义已经丢失,因为编程中唯一明显的区别是签名。结果是它reduce已成为 的同义词fold,而不是保留它在 MapReduce 中的原始含义。现在,这些术语通常可以互换使用,并且在大多数实现中表现相同(忽略空集合)。怪异会因特殊性而加剧,例如 Spark,我们现在将讨论这些。

所以 Spark确实fold,但是子结果(每个分区一个)组合的顺序(在撰写本文时)与任务完成的顺序相同 - 因此是不确定的。感谢@CafeFeed 指出folduses runJob,在阅读完代码后我意识到它是不确定的。Spark 具有treeReduce但没有treeFold.

结论

reduce在应用于非空序列时,fold甚至在应用到非空序列时也存在差异。前者被定义为任意顺序集合的 MapReduce 编程范式的一部分 ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) 并且应该假设运算符除了是可交换的之外关联以给出确定性的结果。后者是根据同构定义的,并且要求集合具有序列的概念(或递归定义,如链表),因此不需要交换运算符。

在实践中,由于编程的非数学性质,reduce并且fold倾向于以相同的方式表现,或者正确(如在 Scala 中)或不正确(如在 Spark 中)。

额外:我对 Spark API 的看法

fold我的观点是,如果在 Spark 中完全放弃使用该术语,就可以避免混淆。至少 spark 在他们的文档中确实有一个注释:

这与在 Scala 等函数式语言中为非分布式集合实现的折叠操作有些不同。

于 2014-08-06T11:07:42.893 回答
10

如果我没记错的话,即使 Spark API 不需要它, fold 也要求 f 是可交换的。因为分区聚合的顺序是不确定的。例如,在以下代码中,仅对第一个打印输出进行排序:

import org.apache.spark.{SparkConf, SparkContext}

object FoldExample extends App{

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Simple Application")
  implicit val sc = new SparkContext(conf)

  val range = ('a' to 'z').map(_.toString)
  val rdd = sc.parallelize(range)

  println(range.reduce(_ + _))
  println(rdd.reduce(_ + _))
  println(rdd.fold("")(_ + _))
}  

打印出:

abcdefghijklmnopqrstuvwxyz

abcghituvjklmwxyzqrsdefnop

defghinopjklmqrstuvabcwxyz

于 2015-03-16T16:24:58.610 回答
3

fold在 Apache Spark 中与fold在非分布式集合中不同。事实上,它需要交换函数来产生确定性结果:

这与在 Scala 等函数式语言中为非分布式集合实现的折叠操作有些不同。此折叠操作可以单独应用于分区,然后将这些结果折叠到最终结果中,而不是以某些定义的顺序顺序将折叠应用于每个元素。对于不可交换的函数,结果可能与应用于非分布式集合的折叠结果不同。

已由Mishael Rosenthal展示,并由Make42他的评论中提出。

有人建议,观察到的行为与HashPartitionerwhen 实际上parallelize不洗牌和不使用HashPartitioner.

import org.apache.spark.sql.SparkSession

/* Note: standalone (non-local) mode */
val master = "spark://...:7077"  

val spark = SparkSession.builder.master(master).getOrCreate()

/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })

/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

解释:

foldRDD的结构

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  var jobResult: T
  val cleanOp: (T, T) => T
  val foldPartition = Iterator[T] => T
  val mergeResult: (Int, T) => Unit
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

与RDD的结构reduce相同:

def reduce(f: (T, T) => T): T = withScope {
  val cleanF: (T, T) => T
  val reducePartition: Iterator[T] => Option[T]
  var jobResult: Option[T]
  val mergeResult =  (Int, Option[T]) => Unit
  sc.runJob(this, reducePartition, mergeResult)
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

whererunJob是在不考虑分区顺序的情况下执行的,导致需要交换函数。

foldPartition并且reducePartition在处理顺序方面是等效的,并且有效地(通过继承和委托)由reduceLeftfoldLefton实现TraversableOnce

结论:fold在RDD上不能依赖于块的顺序,需要交换性和关联性

于 2016-06-05T17:21:28.190 回答
2

Scalding 的另一个区别是在 Hadoop 中使用组合器。

想象一下,您的操作是可交换的幺半群,使用reduce它也将应用于 map 端,而不是将所有数据改组/排序到 reducer。对于foldLeft,情况并非如此。

pipe.groupBy('product) {
   _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
   // reduce is .mapReduceMap in disguise
}

pipe.groupBy('product) {
   _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}

在 Scalding 中将操作定义为 monoid 始终是一个好习惯。

于 2014-08-07T23:53:17.753 回答