我正在努力添加Spark 3.1
和Scala 2.12
支持Kylo Data-Lake Management Platform。
我需要有关迁移以下功能的帮助:
/**
* Creates an {@link Accumulable} shared variable with a name for display in the Spark UI.
*/
@Nonnull
static <R, P1> Accumulable<R, P1> accumulable(@Nonnull final R initialValue, @Nonnull final String name, @Nonnull final AccumulableParam<R, P1> param,
@Nonnull final KyloCatalogClient<Dataset<Row>> client) {
return ((KyloCatalogClientV2) client).getSparkSession().sparkContext().accumulable(initialValue, name, param);
}
/**
* Applies the specified function to the specified field of the data set.
*/
@Nonnull
static Dataset<Row> map(@Nonnull final Dataset<Row> dataSet, @Nonnull final String fieldName, @Nonnull final Function1 function, @Nonnull final DataType returnType) {
final Seq<Column> inputs = Seq$.MODULE$.<Column>newBuilder().$plus$eq(dataSet.col(fieldName)).result();
final UserDefinedFunction udf = new UserDefinedFunction(function, returnType, Option$.MODULE$.<Seq<DataType>>empty());
return dataSet.withColumn(fieldName, udf.apply(inputs));
}
我正在添加一个新的 Maven 模块kylo-spark-catalog-spark-v3
来支持apache-spark:3.1.2
并scala:2.12.10
在撰写本文时。
我在以下方面遇到了麻烦:
- 创建一个实例
AccumulatorV2
作为类的弃用通知Accumulable
不是很清楚。这是我对第一个功能的尝试-未编译:
@Nonnull
static <R, P1> AccumulatorV2<R, P1> accumulable(@Nonnull final R initialValue, @Nonnull final String name, @Nonnull final AccumulatorV2<R, P1> param,
@Nonnull final KyloCatalogClient<Dataset<Row>> client) {
AccumulatorV2<R, P1> acc = AccumulatorContext.get(AccumulatorContext.newId()).get();
acc.register(((KyloCatalogClientV3) client).getSparkSession().sparkContext(), new Some<>(name), true);
return acc;
}
- 在第二个函数中创建 UDF 的实例,
UserDefinedFunction
似乎抱怨它不能被实例化为它的抽象类。这是我对第二个功能的尝试-编译但不确定是否有意义:
/**
* Applies the specified function to the specified field of the data set.
*/
@Nonnull
static Dataset<Row> map(@Nonnull final Dataset<Row> dataSet, @Nonnull final String fieldName, @Nonnull final Function1 function, @Nonnull final DataType returnType) {
final Seq<Column> inputs = Seq$.MODULE$.<Column>newBuilder().$plus$eq(dataSet.col(fieldName)).result();
final UserDefinedFunction udf = udf(function, returnType);
return dataSet.withColumn(fieldName, udf.apply(inputs));
}
您能否就如何解决这个问题向我提出建议,或者是否有与此案相近的文档。