0

我正在努力添加Spark 3.1Scala 2.12支持Kylo Data-Lake Management Platform

我需要有关迁移以下功能的帮助:

    /**
     * Creates an {@link Accumulable} shared variable with a name for display in the Spark UI.
     */
    @Nonnull
    static <R, P1> Accumulable<R, P1> accumulable(@Nonnull final R initialValue, @Nonnull final String name, @Nonnull final AccumulableParam<R, P1> param,
                                                  @Nonnull final KyloCatalogClient<Dataset<Row>> client) {
        return ((KyloCatalogClientV2) client).getSparkSession().sparkContext().accumulable(initialValue, name, param);
    }
/**
     * Applies the specified function to the specified field of the data set.
     */
    @Nonnull
    static Dataset<Row> map(@Nonnull final Dataset<Row> dataSet, @Nonnull final String fieldName, @Nonnull final Function1 function, @Nonnull final DataType returnType) {
        final Seq<Column> inputs = Seq$.MODULE$.<Column>newBuilder().$plus$eq(dataSet.col(fieldName)).result();
        final UserDefinedFunction udf = new UserDefinedFunction(function, returnType, Option$.MODULE$.<Seq<DataType>>empty());
        return dataSet.withColumn(fieldName, udf.apply(inputs));
    }

可以在这里这里找到

我正在添加一个新的 Maven 模块kylo-spark-catalog-spark-v3来支持apache-spark:3.1.2scala:2.12.10在撰写本文时。

我在以下方面遇到了麻烦:

  1. 创建一个实例AccumulatorV2作为类的弃用通知Accumulable不是很清楚。这是我对第一个功能的尝试-未编译
    @Nonnull
    static <R, P1> AccumulatorV2<R, P1> accumulable(@Nonnull final R initialValue, @Nonnull final String name, @Nonnull final AccumulatorV2<R, P1> param,
                                                  @Nonnull final KyloCatalogClient<Dataset<Row>> client) {
        AccumulatorV2<R, P1> acc = AccumulatorContext.get(AccumulatorContext.newId()).get();
        acc.register(((KyloCatalogClientV3) client).getSparkSession().sparkContext(), new Some<>(name), true);
        return acc;
    }
  1. 在第二个函数中创建 UDF 的实例,UserDefinedFunction似乎抱怨它不能被实例化为它的抽象类。这是我对第二个功能的尝试-编译但不确定是否有意义:
    /**
     * Applies the specified function to the specified field of the data set.
     */
    @Nonnull
    static Dataset<Row> map(@Nonnull final Dataset<Row> dataSet, @Nonnull final String fieldName, @Nonnull final Function1 function, @Nonnull final DataType returnType) {
        final Seq<Column> inputs = Seq$.MODULE$.<Column>newBuilder().$plus$eq(dataSet.col(fieldName)).result();
        final UserDefinedFunction udf = udf(function, returnType);
        return dataSet.withColumn(fieldName, udf.apply(inputs));
    }

您能否就如何解决这个问题向我提出建议,或者是否有与此案相近的文档。

4

1 回答 1

1

1.accumulable


旧版本的 Spark 有两个用于累加器的 API:Accumulable(当输入和输出类型不同时)和Accumulator(当输入和输出类型相同时,即Accumulable<T,T>)。要创建一个Accumulable<OUT, IN>实例,您需要一个AccumulableParam<OUT, IN>已定义的“合并”操作 (OUT+OUT)、“添加”操作 (OUT+IN) 和 OUT 类型的零值。

AccumulatorV2的组织方式不同。它是一个抽象类,需要用addmerge操作来扩展;以及对什么是零值的理解。您可以在 Spark 的源代码中看到CollectionAcumulator的示例实现。

accumulable使用 AccumulatorV2 API 更改功能的责任。它不再需要创建累加器。它只需要在 SparkContext 中使用给定名称注册它。我认为以下是有道理的:

@Nonnull
static <R, P1> AccumulatorV2<P1, R> accumulable(
    @Nonnull final R initialValue, // unused
    @Nonnull final String name,
    @Nonnull final AccumulatorV2<P1, R> acc,
    @Nonnull final KyloCatalogClient<Dataset<Row>> client
) {
    ((KyloCatalogClientV2) client).getSparkSession().sparkContext().register(acc, name);
    return acc;
}

需要注意的几点:

  • 类签名中的类型顺序已更改:Accumulable<OUT, IN> vs AccumulatorV2<IN, OUT>
  • initialValue需要在创建时传递给累加器。但是,您需要确保copyAndReset返回一个具有零值的新累加器,而不是initialValue. 或者,您需要添加一个 P1 类型的值,该值在添加到累加器后会导致累加器返回 R 类型的预期初始值。这不是一个好主意,只是一个替代方案。
  • 另外,不要使用AccumulatorContext,因为根据文档它是

Spark 本身用于跟踪累加器的内部类。

2.map


我认为您的功能是正确且有意义的。在 Spark v2.0 和 v3.1 之间UserDefinedFunction更改为抽象类,现在您使用udf 函数( import static org.apache.spark.sql.functions.udf)对其进行实例化

于 2021-08-04T15:53:09.597 回答