我想知道在什么情况下 Spark 将作为 UDAF 功能的一部分执行合并。
动机: 我在我的 Spark 项目中的一个窗口上使用了很多 UDAF 函数。我经常想回答这样的问题:
在 30 天的窗口内,与当前交易在同一国家/地区进行了多少次信用卡交易?
该窗口将从当前事务开始,但不会将其包括在计数中。它需要当前交易的价值才能知道过去 30 天内要计算哪个国家/地区。
val rollingWindow = Window
.partitionBy(partitionByColumn)
.orderBy(orderByColumn.desc)
.rangeBetween(0, windowSize)
df.withColumn(
outputColumnName,
customUDAF(inputColumn, orderByColumn).over(rollingWindow))
我写了我的 customUDAF 来进行计数。我总是使用.orderBy(orderByColumn.desc)
并感谢.desc
当前交易在计算过程中首先出现在窗口中。
UDAF 函数需要实现merge
在并行计算中合并两个中间聚合缓冲区的函数。如果发生任何合并,current transaction
不同缓冲区的 my 可能不一样,UDAF 的结果将不正确。
我编写了一个 UDAF 函数,它计算我的数据集上的合并次数,并只保留窗口中的第一个事务以与当前事务进行比较。
class FirstUDAF() extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
.add("y", StringType)
def bufferSchema = new StructType()
.add("first", StringType)
.add("numMerge", IntegerType)
def dataType = new StructType()
.add("firstCode", StringType)
.add("numMerge", IntegerType)
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = ""
buffer(1) = 1
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer.getString(0) == "")
buffer(0) = input.getString(0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
def evaluate(buffer: Row) = buffer
}
当我在具有 16 个 cpu 的本地主机上使用 spark 2.0.1 运行它时,从来没有任何合并,并且窗口中的第一个事务始终是当前事务。这就是我要的。在不久的将来,我将在 x100 更大的数据集和真正的分布式 Spark 集群上运行我的代码,并想知道那里是否可以发生合并。
问题:
- UDAF 在哪些情况/条件下进行合并?
- 带有 orderBy 的 Windows 是否曾经进行过合并?
- 是否可以告诉 Spark 不要进行合并?