我想将一个对象从驱动程序节点传递到 RDD 所在的其他节点,以便 RDD 的每个分区都可以访问该对象,如下面的代码片段所示。
object HelloSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Testing HelloSpark")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "xt.HelloKryoRegistrator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 20, 4)
val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))
rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
.collect()
.foreach(println)
sc.stop
}
}
// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) = {
kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
}
}
//My serializer
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
output.writeInt(obj.getLength)
output.writeInt(obj.getOffset)
output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
}
override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
val length = input.readInt()
val offset = input.readInt()
val bytes = new Array[Byte](length)
input.read(bytes, offset, length)
new ImmutableBytesWritable(bytes)
}
}
在上面的代码片段中,我尝试在 Spark 中由 Kryo序列化ImmutableBytesWritable ,所以我做了以下操作:
- 配置传递给spark上下文的SparkConf实例,即将“ spark.serializer ”设置为“ org.apache.spark.serializer.KryoSerializer ”,将“ spark.kryo.registrator ”设置为“ xt.HelloKryoRegistrator ”;
- 编写一个自定义 Kryo 注册器类,我在其中注册类ImmutableBytesWritable;
- 为ImmutableBytesWritable编写序列化程序
但是,当我在 yarn-client 模式下提交我的 Spark 应用程序时,抛出了以下异常:
线程“主”org.apache.spark.SparkException 中的异常:在 org.apache.spark.util.ClosureCleaner$.clean 的 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 中的任务不可序列化(ClosureCleaner.scala:158) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 在 org.apache.spark.rdd.RDD.map(RDD.scala:270) 在 xt.HelloSpark$.main (HelloSpark.scala:23) 在 xt.HelloSpark.main(HelloSpark.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect。 DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:325) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 原因:java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData( ObjectOutputStream.java:1508) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ) 在 org.apache 的 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)。spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 还有 12 个
似乎ImmutableBytesWritable不能被 Kryo 序列化。那么让 Spark 使用 Kryo 序列化对象的正确方法是什么?Kryo 可以序列化任何类型吗?