3

我对这个输出感到惊讶fold,我无法想象它在做什么。

我希望这something.fold(0, lambda a,b: a+1)将返回 中的元素数量something,因为折叠从每个元素开始0并添加1

sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
8

我来自 Scala,其中 fold 就像我所描述的那样工作。那么 fold 应该如何在 pyspark 中工作?谢谢你的想法。

4

2 回答 2

7

要了解这里发生了什么,让我们看一下 Sparkfold操作的定义。由于您使用的是 PySpark,我将展示代码的 Python 版本,但 Scala 版本表现出完全相同的行为(您也可以浏览 GitHub 上的源代码):

def fold(self, zeroValue, op):
    """
    Aggregate the elements of each partition, and then the results for all
    the partitions, using a given associative function and a neutral "zero
    value."
    The function C{op(t1, t2)} is allowed to modify C{t1} and return it
    as its result value to avoid object allocation; however, it should not
    modify C{t2}.
    >>> from operator import add
    >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
    15
    """
    def func(iterator):
        acc = zeroValue
        for obj in iterator:
            acc = op(obj, acc)
        yield acc
    vals = self.mapPartitions(func).collect()
    return reduce(op, vals, zeroValue)

(为了比较,请参阅 的Scala 实现RDD.fold)。

Spark 的fold操作方式是先折叠每个分区,然后再折叠结果。问题是空分区被折叠到零元素,因此最终的驱动程序端折叠最终会为每个分区折叠一个值,而不是为每个非空分区折叠一个值。这意味着 的结果fold对分区数很敏感:

>>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
100
>>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
50
>>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
1

在最后一种情况下,发生的事情是单个分区被折叠到正确的值,然后将该值与驱动程序处的零值折叠以产生 1。

看来,Spark 的fold()操作实际上要求 fold 函数除了是可交换的,还是可交换的。实际上,Spark 中还有其他地方会强制执行此要求,例如,混洗分区中元素的顺序在运行中可能是不确定的(请参阅SPARK-5750)。

我已经打开了 Spark JIRA 票来调查这个问题:https ://issues.apache.org/jira/browse/SPARK-6416 。

于 2015-03-19T18:07:59.043 回答
2

让我试着举一个简单的例子来解释 spark 的 fold 方法。我将在这里使用 pyspark。

rdd1 = sc.parallelize(list([]),1)

上面的行将创建一个带有一个分区的空 rdd

rdd1.fold(10, lambda x,y:x+y)

这产量输出为 20

rdd2 = sc.parallelize(list([1,2,3,4,5]),2)

上面的行将创建值为 1 到 5 的 rdd,并且总共有 2 个分区

rdd2.fold(10, lambda x,y:x+y)

这产生输出为 45

因此,在上述情况下,为了简单起见,这里发生的情况是您将第 0 个元素设为 10。因此,您将在 RDD 中获得的所有数字的总和现在加上 10(即第 0 个元素+所有其他元素 = > 10+1+2+3+4+5 = 25)。现在我们也有两个分区(即分区数*第零元素=> 2*10 = 20) fold 发出的最终输出是 25+20 = 45

使用类似的过程可以清楚为什么 rdd1 上的折叠操作会产生 20 作为输出。

当我们有类似的空列表时,Reduce 失败rdd1.reduce(lambda x,y:x+y)

ValueError:不能减少()空RDD

如果我们认为我们可以在 rdd 中有空列表,可以使用折叠 rdd1.fold(0, lambda x,y:x+y)

正如预期的那样,这将产生输出为 0。

于 2018-10-07T10:38:32.430 回答