2

Spark UDF 包含以下功能:可为空、确定性、数据类型等。因此根据这些信息,它将受益于诸如 ConstantFolding 之类的优化。它还受益于哪些其他优化,哪些优化不能受益?我之所以这么问,是因为许多演示文稿将 UDF 视为一个黑匣子,它不会从催化剂优化中受益,但很明显,它会从 ConstantFolding 中受益。

4

1 回答 1

3

Spark 通过将 UDF 包装在一个类中来处理它们。例如,当您编写以下内容时:

val example = udf((a: Int) => a * 2)

udf函数所做的是创建一个UserDefinedFunction类,该类在其应用函数中创建一个ScalaUDF. ScalaUDF 扩展表达式,并在其 doCodeGen 方法中执行以下操作:

...
    val callFunc =
      s"""
         |$boxedType $resultTerm = null;
         |try {
         |  $resultTerm = ($boxedType)$resultConverter.apply($getFuncResult);
         |} catch (Exception e) {
         |  throw new org.apache.spark.SparkException($errorMsgTerm, e);
         |}
       """.stripMargin

    ev.copy(code =
      code"""
         |$evalCode
         |${initArgs.mkString("\n")}
         |$callFunc
...

此函数将DataType列/表达式的 转换为 Scala 类型(因为您的 UDF 对 scala 类型进行操作),然后它调用您的 lambda。deterministic, nullable,anddataTypes是用户定义函数的包装器的函数,因为它扩展了 Expression,而不是您的函数。如果您想充分受益于它们,您将不得不编写一个扩展的自定义表达式Expression或其子类之一。

以以下为例:

val redundantUdf = udf((a: Long) => true)
someDf.filter(redundantUdf(someDf("col1"))).explain()

优化的逻辑计划如下所示:

Project [_1#5736 AS Type#5739, _2#5737L AS sts#5740L]
 +- Filter UDF(_2#5737L)
  +- LocalRelation [_1#5736, _2#5737L]

正如您所看到的,它正在执行过滤器,即使它是多余的并且总是评估为真。

鉴于以下情况:

someDf.filter(expr("true")).explain()

将给出以下优化的逻辑计划:

LocalRelation [Type#5739, sts#5740L]

它使用 PruneFilter 规则修剪过滤器。

这并不意味着所有优化都被排除在外,还有一些优化仍然适用于 UDF,例如CombineFilter结合来自两个过滤器的表达式,例如:

== Analyzed Logical Plan ==
_1: string, _2: string
Filter UDF(_1#2)
+- Filter UDF(_1#2)
   +- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
Filter (UDF(_1#2) && UDF(_1#2))
+- LocalRelation [_1#2, _2#3]

此优化之所以有效,是因为它仅依赖于deterministic字段,并且 UDF 默认情况下是确定性的。因此,UDF 将受益于不依赖于它包装的函数的简单优化。这是因为它是催化剂无法理解的格式,催化剂在树上运行,而您的闭包是 Scala 函数。还有其他一些 UDF 丢失的地方,例如指定生成的 java 代码和 spark 类型信息。

于 2019-04-23T22:16:13.710 回答