目前,当我尝试在独立模式下使用 Cassandra 运行 Spark 时遇到一些问题。
最初,我在 SparkContext 中使用参数 mater="local[4]" 成功运行。
然后,我尝试进入独立模式。我使用的是:
Ubuntu:12.04 Cassandra:1.2.11 Spark:0.8.0 Scala:2.9.3 JDK:Oracle 1.6.0_35 Kryo:2.21
起初,我收到“未读块”错误。作为其他主题的建议,我更改为使用 Kryo 序列化程序并添加 Twitter Chill。然后,我在控制台中收到“注册 spark.kryo.registrator 失败”和如下异常:
13/10/28 12:12:36 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task 0.0:0)
13/10/28 12:12:36 INFO cluster.ClusterTaskSetManager: Loss was due to java.io.EOFException
java.io.EOFException
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:109)
at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:150)
at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at scala.collection.immutable.$colon$colon.readObject(List.scala:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:129)
at java.io.ObjectInputStream.readExternalData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
之前也有人在spark中遇到过EOFException,答案是没有正确注册registrator。我按照 Spark 指南注册了 Registrator。注册人如下:
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[org.apache.spark.rdd.RDD[(Map[String, ByteBuffer], Map[String, ByteBuffer])]])
kryo.register(classOf[String], 1)
kryo.register(classOf[Map[String, ByteBuffer]], 2)
}
}
我也像指南一样设置属性。
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "main.scala.MyRegistrator")
谁能给我一些提示我做错了什么?谢谢。