1

我想将 scala 对象方法注册到 spark udf 中。现在我通过 scala reflect 获得 MethodMirror,通过 java reflect 获得 Parameter。但我无法生成用于注册到 spark.udf 的函数对象。如:

object ArrayUdfs {
  def array2String(arr: Seq[Long])= {
    arr.mkString(",")
  }
}

我想将方法​​“array2String”注册到 spark udf 中。

首先我得到MethodMirror:

def getObjectMethod(clazzPath:String, methodName:String)  = {
   import scala.reflect.runtime.universe

   lazy val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
   lazy val module = runtimeMirror.staticModule(clazzPath)
   lazy val obj = runtimeMirror.reflectModule(module)
   lazy val objMirror = runtimeMirror.reflect(obj.instance)
   lazy val method = obj.symbol.typeSignature.member(universe.TermName(methodName)).asMethod
   lazy val methodObject = objMirror.reflectMethod(method)
   methodObject
}

lazy val method = getObjectMethod(clazzPath, funName);

第二。我得到参数。

def getObjectMethodParams(clazzPath:String, methodName:String): Array[Parameter] = {
    val methods = Class.forName(clazzPath).getDeclaredMethods()
    var params: Array[Parameter] = null;
    methods.foreach(method => {
      if (method.getName == methodName) params = method.getParameters
    })
    params
}

val params = getObjectMethodParams(clazzPath, funName)

三、注册到spark.udf

val function1: Function1[Seq[String], String] = (arr) => {
   method.apply(arr).asInstanceOf[String]
}

spark.udf.register("array2String", function1)

所以我想用'params'对象替换'Function1 [Seq [String],String]'中的'Seq [String]'。'Function1[Seq[String], String]' 中的 'String' 是相同的。

4

0 回答 0