4

我正在尝试从包含 scala 函数定义的字符串中定义 spark(2.0) 中的 udf。这是代码段:

val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show

这给了我一个错误:

  Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
   at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
   at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
   at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
   at org.apache.spark.scheduler.Task.run(Task.scala:85)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)

但是,当我将 udf 定义为:

val f = udf((s:String) => 5)

它工作得很好。这里有什么问题?最终目标是获取一个具有 scala 函数定义的字符串并将其用作 udf。

4

2 回答 2

5

正如 Giovanny 所观察到的,问题在于类加载器不同(您可以通过调用.getClass.getClassLoader任何对象来进行更多研究)。然后,当工作人员试图反序列化您的反射函数时,一切都会崩溃。

这是一个不涉及任何类加载器黑客的解决方案。这个想法是将反思步骤转移到工人身上。我们最终将不得不重做反射步骤,但每个工人只能重做一次。我认为这是非常理想的——即使您只在主节点上进行了一次反射,您也必须为每个工作人员做大量工作才能让他们识别该功能。

val f = udf {
  new Function1[String,Int] with Serializable {
    import scala.reflect.runtime.universe._
    import scala.reflect.runtime.currentMirror
    import scala.tools.reflect.ToolBox

    lazy val toolbox = currentMirror.mkToolBox()
    lazy val func = {
      println("reflected function") // triggered at every worker
      toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
    }

    def apply(s: String): Int = func(s)
  }
}

然后,调用sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show就可以了。

随意评论println- 这只是一种计算反射发生次数的简单方法。这spark-shell --master 'local'只是一次,但spark-shell --master 'local[2]'它是两次。

这个怎么运作

UDF 会立即被评估,但在到达工作节点之前它永远不会被使用,因此惰性值toolbox只会func在工作人员上得到评估。此外,由于它们很懒惰,因此每个工人只能对它们进行一次评估。

于 2016-08-14T20:14:52.367 回答
3

我有同样的错误,它没有显示 ClassNotFoundException 因为 JavaDeserializationStream 类正在捕获异常,这取决于你的环境它失败了,因为它找不到你试图从你的 RDD/DataSet 执行的类但是它没有显示 ClassNotFoundError 。为了解决这个问题,我必须生成一个包含项目中所有类的 jar(包括函数和依赖项),并将 jar 包含在 spark 环境中

这对于独立集群

conf.setJars ( Array ("/fullpath/yourgeneratedjar.jar", "/fullpath/otherdependencies.jar") )

这是一个纱线集群

conf.set("spark.yarn.jars", "/fullpath/yourgeneratedjar.jar,/fullpath/otherdependencies.jar")
于 2016-08-11T23:42:22.510 回答