5

我想知道在什么情况下 Spark 将作为 UDAF 功能的一部分执行合并。

动机: 我在我的 Spark 项目中的一个窗口上使用了很多 UDAF 函数。我经常想回答这样的问题:

在 30 天的窗口内,与当前交易在同一国家/地区进行了多少次信用卡交易?

该窗口将从当前事务开始,但不会将其包括在计数中。它需要当前交易的价值才能知道过去 30 天内要计算哪个国家/地区。

val rollingWindow = Window
      .partitionBy(partitionByColumn)
      .orderBy(orderByColumn.desc)
      .rangeBetween(0, windowSize)

df.withColumn(
  outputColumnName,
  customUDAF(inputColumn, orderByColumn).over(rollingWindow))

我写了我的 customUDAF 来进行计数。我总是使用.orderBy(orderByColumn.desc)并感谢.desc当前交易在计算过程中首先出现在窗口中。

UDAF 函数需要实现merge在并行计算中合并两个中间聚合缓冲区的函数。如果发生任何合并,current transaction不同缓冲区的 my 可能不一样,UDAF 的结果将不正确。

我编写了一个 UDAF 函数,它计算我的数据集上的合并次数,并只保留窗口中的第一个事务以与当前事务进行比较。

 class FirstUDAF() extends UserDefinedAggregateFunction {

  def inputSchema = new StructType().add("x", StringType)
    .add("y", StringType)

  def bufferSchema = new StructType()
    .add("first", StringType)
    .add("numMerge", IntegerType)

  def dataType = new StructType()
    .add("firstCode", StringType)
    .add("numMerge", IntegerType)

  def deterministic = true

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = ""
    buffer(1) = 1
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (buffer.getString(0) == "")
      buffer(0) = input.getString(0)

  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
  }

  def evaluate(buffer: Row) = buffer
}

当我在具有 16 个 cpu 的本地主机上使用 spark 2.0.1 运行它时,从来没有任何合并,并且窗口中的第一个事务始终是当前事务。这就是我要的。在不久的将来,我将在 x100 更大的数据集和真正的分布式 Spark 集群上运行我的代码,并想知道那里是否可以发生合并。

问题:

  • UDAF 在哪些情况/条件下进行合并?
  • 带有 orderBy 的 Windows 是否曾经进行过合并?
  • 是否可以告诉 Spark 不要进行合并?
4

1 回答 1

3

UDAF 在哪些情况/条件下进行合并?

merge当聚合函数的部分应用程序(“map side aggregation”)在shuffle(“reduce side aggregation”)之后被合并时被调用。

带有 orderBy 的 Windows 是否曾经进行过合并?

当前的实现中永远不会。至于现在的窗口函数只是花哨groupByKey的,并没有部分聚合。这当然是实现细节,将来可能会更改,恕不另行通知。

是否可以告诉 Spark 不要进行合并?

它不是。但是,如果数据已经被聚合键分区,则不需要merge并且仅combine使用。

最后:

在 30 天的窗口内,与当前交易在同一国家/地区进行了多少次信用卡交易?

不调用UDAFs或窗口函数。我可能会创建翻滚窗口o.a.s.sql.functions.window,按用户、国家和窗口聚合并返回输入。

于 2017-12-18T12:09:21.953 回答