0

我编写了一个自定义Aggregator( 的扩展org.apache.spark.sql.expressions.Aggregator),Spark 将其作为语句下的聚合函数正确调用group by

sparkSession
    .createDataFrame(...)
    .groupBy(col("id"))
    .agg(
        new MyCustomAggregator().toColumn().name("aggregation_result"))
    .show();

不过,我想在窗口函数中使用它,因为排序对我很重要。我试过这样调用它:

sparkSession
    .createDataFrame(...)
    .withColumn("aggregation_result", new MyCustomAggregator().toColumn().over(Window
        .partitionBy(col("id"))
        .orderBy(col("order"))))
    .show();

这就是我得到的错误:

org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `id` ORDER BY `order` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.

是否可以在 Spark 3.0.1 中使用自定义聚合器作为窗口函数?如果是这样,我在这里错过了什么?

4

1 回答 1

0

是的,Spark 3 确实支持自定义聚合器作为窗口函数。

这是Java代码:

UserDefinedFunction myCustomAggregation = functions.udaf(new MyCustomAggregator(), Encoders.bean(AggregationInput.class));

sparkSession
    .createDataFrame(...)
    .withColumn("aggregation_result", myCustomAggregation.apply(col("aggregation_input1"), col("aggregation_input2")).over(Window
        .partitionBy(col("id"))
        .orderBy(col("order"))))
    .show();

AggregationInput这是一个简单的 DTO,其中包含聚合函数所需的行元素。

因此,无论您是聚合下group by还是作为窗口函数,您仍然想使用org.apache.spark.sql.expressions.Aggregator.

于 2020-12-02T07:49:28.500 回答