1

SparkSessionExtensions injectionFunction 在本地工作,但我无法让它在 Databricks 环境中工作。

itachi项目定义了 Catalyst 表达式,就像我可以通过以下方式在age本地成功使用一样spark-sql

bin/spark-sql --packages com.github.yaooqinn:itachi_2.12:0.1.0 --conf spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions
spark-sql> select age(timestamp '2000', timestamp'1990');
10 years

我无法在 Databricks 环境中使用它。

spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions我使用配置选项集启动了一个 Databricks 社区集群。

创建集群

然后我附上了图书馆。

附加库

itachi 中定义的array_append函数无法像我预期的那样访问:

无法运行功能

确认配置选项设置正确:

在此处输入图像描述

spark-alchemy 有另一种适用于 Databricks 环境的方法。我们是否需要弄乱 Spark 内部结构才能使其在 Databricks 环境中工作?或者有没有办法injectFunction在 Databricks 中工作?

4

1 回答 1

1

在完整的spark.sql.extensionsDatabricks 上工作得很好(直到它深入到 Spark 的内部 - 有时存在不兼容性),但在社区版上却不行。问题是spark.sql.extensions在会话初始化期间调用,并且在 UI 中指定的库是在之后安装的,所以这发生在初始化之后/并行。在完整的 Databricks 上,可以通过在集群启动之前使用init 脚本安装库来解决此问题,但此功能在 Community Edition 上不可用。

解决方法是显式注册函数,如下所示:

%scala
import org.apache.spark.sql.catalyst.expressions.postgresql.{Age, ArrayAppend, ArrayLength, IntervalJustifyLike, Scale, SplitPart, StringToArray, UnNest}
import org.apache.spark.sql.extra.FunctionAliases

spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)
spark.sessionState.functionRegistry.registerFunction(FunctionAliases.array_cat._1, FunctionAliases.array_cat._2, FunctionAliases.array_cat._3)
spark.sessionState.functionRegistry.registerFunction(ArrayAppend.fd._1, ArrayAppend.fd._2, ArrayAppend.fd._3)
spark.sessionState.functionRegistry.registerFunction(ArrayLength.fd._1, ArrayLength.fd._2, ArrayLength.fd._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyDays._1, IntervalJustifyLike.justifyDays._2, IntervalJustifyLike.justifyDays._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyHours._1, IntervalJustifyLike.justifyHours._2, IntervalJustifyLike.justifyHours._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyInterval._1, IntervalJustifyLike.justifyInterval._2, IntervalJustifyLike.justifyInterval._3)
spark.sessionState.functionRegistry.registerFunction(Scale.fd._1, Scale.fd._2, Scale.fd._3)
spark.sessionState.functionRegistry.registerFunction(SplitPart.fd._1, SplitPart.fd._2, SplitPart.fd._3)
spark.sessionState.functionRegistry.registerFunction(StringToArray.fd._1, StringToArray.fd._2, StringToArray.fd._3)
spark.sessionState.functionRegistry.registerFunction(UnNest.fd._1, UnNest.fd._2, UnNest.fd._3)

之后它的工作原理:

在此处输入图像描述

它不像扩展那么方便,但这是 CE 的限制。

于 2021-03-20T13:01:26.717 回答