谁能解释reducebykey
,groupbykey
和之间aggregatebykey
的区别combinebykey
?我已阅读有关此的文件,但无法理解确切的差异。
带有示例的解释会很棒。
谁能解释reducebykey
,groupbykey
和之间aggregatebykey
的区别combinebykey
?我已阅读有关此的文件,但无法理解确切的差异。
带有示例的解释会很棒。
groupByKey:
句法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey
当数据通过网络发送并在减少的工作人员身上收集时,可能会导致磁盘不足问题。
reduceByKey:
句法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
数据在每个分区合并,每个分区的一个键只有一个输出通过网络发送。reduceByKey
需要将所有值组合成另一个具有完全相同类型的值。
聚合键:
与 相同reduceByKey
,它采用初始值。
3个参数作为输入
例子:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
输出: 按键汇总结果栏 -> 3 foo -> 5
组合键:
3个参数作为输入
aggregateByKey
,不需要总是传递常量,我们可以传递一个返回新值的函数。例子:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey
, aggregateByKey
,combineByKey
优先于 groupByKey
参考: 避免 groupByKey
groupByKey()
只是根据一个键对您的数据集进行分组。当RDD尚未分区时,它将导致数据混洗。reduceByKey()
类似于分组+聚合。我们可以说reduceByKey()
等价于 dataset.group(...).reduce(...)。与groupByKey()
.aggregateByKey()
逻辑上相同,reduceByKey()
但它允许您返回不同类型的结果。换句话说,它允许您将输入作为类型 x 并将聚合结果作为类型 y。例如 (1,2),(1,4) 作为输入, (1,"six") 作为输出。它还采用将在每个键的开头应用的零值。注意:一个相似之处是它们都是宽操作。
虽然 reducebykey 和 groupbykey 都会产生相同的答案,但 reduceByKey 示例在大型数据集上效果更好。这是因为 Spark 知道它可以在对数据进行混洗之前将输出与每个分区上的公共键组合起来。
另一方面,当调用 groupByKey 时,所有的键值对都被打乱了。这是通过网络传输的大量不必要的数据。
有关更详细的信息,请查看以下链接
尽管它们都将获取相同的结果,但两个函数的性能存在显着差异。reduceByKey()
与groupByKey()
.
在中,在对数据进行混洗之前,reduceByKey()
将同一台机器上具有相同键的对组合(通过使用传入的函数)。reduceByKey()
然后再次调用该函数以减少来自每个分区的所有值以产生一个最终结果。
在groupByKey()
中,所有的键值对都被打乱了。这是通过网络传输的大量不必要的数据。
ReduceByKey reduceByKey(func, [numTasks])
-
数据被组合在一起,以便在每个分区中,每个键至少应该有一个值。然后 shuffle 发生,并通过网络发送到某个特定的 executor 以执行某些操作,例如 reduce。
GroupByKey -groupByKey([numTasks])
它不会合并键的值,而是直接发生随机播放过程,这里有很多数据被发送到每个分区,与初始数据几乎相同。
并且每个键的值的合并是在洗牌之后完成的。这里大量数据存储在最终工作节点上,因此导致内存不足问题。
AggregateByKey -aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
与 reduceByKey 类似,但您可以在执行聚合时提供初始值。
用于reduceByKey
reduceByKey
当我们在大型数据集上运行时可以使用。
reduceByKey
当输入和输出值类型相同时aggregateByKey
此外,它建议不要使用groupByKey
和喜欢reduceByKey
。详情可以参考这里。
您还可以参考此问题以更详细地了解如何reduceByKey
和aggregateByKey
.
那么除了这4个,我们还有
foldByKey 与 reduceByKey 相同,但具有用户定义的零值。
AggregateByKey 将 3 个参数作为输入并使用 2 个函数进行合并(一个用于在相同分区上合并,另一个用于跨分区合并值。第一个参数是 ZeroValue)
然而
ReduceBykey 只接受 1 个参数,这是一个用于合并的函数。
CombineByKey 需要 3 个参数,所有 3 个都是函数。与 aggregateBykey 类似,但它可以具有 ZeroValue 函数。
GroupByKey 不带参数并将所有内容分组。此外,它是跨分区数据传输的开销。