我想为 SchemaRDD 引入自定义类型,我正在关注这个示例
但是我遇到了 Kryo 序列化问题,这里是堆栈跟踪:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 6.0 (TID 22, localhost):
com.esotericsoftware.kryo.KryoException: Unable to find class: com.gis.io.GeometryWritable
Serialization trace:
value (org.apache.spark.sql.catalyst.expressions.MutableAny)
values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80)
at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46)
at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.gis.io.GeometryWritable
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 52 more
其中com.gis.io.GeometryWritable是我为其创建 UDT 的自定义类,它存在于 APP jar 中。我也通过将 spark 默认序列化程序从 kryo 更改为 Java 来尝试在 Google 组中弃用。但这对我没有帮助。有什么建议么 ??如果我错过了什么?以下是我的课程:
@SQLUserDefinedType(udt = classOf[GeometryUDT])
class GeometryWritable(var _geometry: Geometry) extends Writable with Serializable {
def geometry = _geometry
def geometry_=(geometry: Geometry) = _geometry = geometry
def this() = this(null)
override def write(dataOutput: DataOutput) : Unit = {}
override def readFields(dataInput: DataInput) : Unit = {}
@throws(classOf[IOException])
private def writeObject(stream: ObjectOutputStream): Unit = {}
@throws(classOf[IOException])
private def readObject(stream: ObjectInputStream): Unit = {}
}
class GeometryUDT extends UserDefinedType[GeometryWritable] with Serializable {
override def sqlType: DataType = ArrayType(ByteType)
override def serialize(obj: Any): Array[Byte] = {}
override def deserialize(datum: Any): GeometryWritable = {}
override def userClass: Class[GeometryWritable] = classOf[GeometryWritable]
}
这就是我使用它的方式。
val rdd = sc.textFile(args(0)).map(
line => {
val point = new Point
point.setY(line.split(" ")(0).toDouble)
point.setX(line.split(" ")(1).toDouble)
Row.fromSeq(Seq(new GeometryWritable(point)))
})
val schema = StructType(Seq(StructField("Geometry",new GeometryUDT, true)))
val schemaRDD = sqlContext.applySchema(rdd, schema).persist(StorageLevel.MEMORY_AND_DISK)
已编辑: 实际上我发现如果我只使用一个 RDD(表),自定义 UDT 可以正常工作。当我们加入两个或多个 RDD 时,就会出现问题。据此,当我们使用自定义 ROW 并使用 JOIN 时,这是一个错误。有什么线索吗??