1

我想为 SchemaRDD 引入自定义类型,我正在关注这个示例
但是我遇到了 Kryo 序列化问题,这里是堆栈跟踪:

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: 
    Lost task 0.0 in stage 6.0 (TID 22, localhost): 
    com.esotericsoftware.kryo.KryoException: Unable to find class: com.gis.io.GeometryWritable
    Serialization trace:
    value (org.apache.spark.sql.catalyst.expressions.MutableAny)
    values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
       at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
       at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
       at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
       at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)
       at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
       at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
       at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
       at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
       at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
       at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
       at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
       at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
       at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
       at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
       at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
       at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
       at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80)
       at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46)
       at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45)
       at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
       at org.apache.spark.scheduler.Task.run(Task.scala:56)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassNotFoundException: com.gis.io.GeometryWritable
       at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
       at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
       at java.security.AccessController.doPrivileged(Native Method)
       at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
       at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
       at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
       at java.lang.Class.forName0(Native Method)
       at java.lang.Class.forName(Class.java:270)
       at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
       ... 52 more

其中com.gis.io.GeometryWritable是我为其创建 UDT 的自定义类,它存在于 APP jar 中。我也通过将 spark 默认序列化程序从 kryo 更改为 Java 来尝试在 Google 组中弃用但这对我没有帮助。有什么建议么 ??如果我错过了什么?以下是我的课程:

    @SQLUserDefinedType(udt = classOf[GeometryUDT])
    class GeometryWritable(var _geometry: Geometry) extends Writable with Serializable {

       def geometry = _geometry
       def geometry_=(geometry: Geometry) = _geometry = geometry

       def this() = this(null)

       override def write(dataOutput: DataOutput) : Unit = {}
       override def readFields(dataInput: DataInput) : Unit = {}
       @throws(classOf[IOException])
       private def writeObject(stream: ObjectOutputStream): Unit = {}
       @throws(classOf[IOException])
       private def readObject(stream: ObjectInputStream): Unit = {}
    }

    class GeometryUDT extends UserDefinedType[GeometryWritable] with Serializable {

      override def sqlType: DataType = ArrayType(ByteType)
      override def serialize(obj: Any): Array[Byte] = {}
      override def deserialize(datum: Any): GeometryWritable = {}
      override def userClass: Class[GeometryWritable] = classOf[GeometryWritable]

   }

这就是我使用它的方式。

    val rdd = sc.textFile(args(0)).map(
    line => {
      val point = new Point
      point.setY(line.split(" ")(0).toDouble)
      point.setX(line.split(" ")(1).toDouble)
      Row.fromSeq(Seq(new GeometryWritable(point)))
     })
     val schema = StructType(Seq(StructField("Geometry",new GeometryUDT, true)))

     val schemaRDD = sqlContext.applySchema(rdd, schema).persist(StorageLevel.MEMORY_AND_DISK)

已编辑: 实际上我发现如果我只使用一个 RDD(表),自定义 UDT 可以正常工作。当我们加入两个或多个 RDD 时,就会出现问题。据此,当我们使用自定义 ROW 并使用 JOIN 时,这是一个错误有什么线索吗??

4

0 回答 0