1

这是我的代码:

  val bg = imageBundleRDD.first()    //bg:[Text, BundleWritable]
  val res= imageBundleRDD.map(data => {
                                val desBundle = colorToGray(bg._2)        //lineA:NotSerializableException: org.apache.hadoop.io.Text
                                //val desBundle = colorToGray(data._2)    //lineB:everything is ok
                                (data._1, desBundle)
                             })
  println(res.count)

lineB 运行良好,但 lineA 显示:org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.io.Text

我尝试使用 Kryo 来解决我的问题,但似乎没有任何改变:

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
       kryo.register(classOf[Text])
       kryo.register(classOf[BundleWritable])
  }
}

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...

谢谢!!!

4

3 回答 3

1

当我的 Java 代码读取包含 Text 键的序列文件时,我遇到了类似的问题。我发现这篇文章很有帮助:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-solve-java-io-NotSerializableException-org-apache-hadoop-io-Text-td2650.html

就我而言,我使用 map 将 Text 转换为 String:

JavaPairRDD<String, VideoRecording> mapped = videos.map(new PairFunction<Tuple2<Text,VideoRecording>,String,VideoRecording>() {
    @Override
    public Tuple2<String, VideoRecording> call(
            Tuple2<Text, VideoRecording> kv) throws Exception {
        // Necessary to copy value as Hadoop chooses to reuse objects
        VideoRecording vr = new VideoRecording(kv._2);
        return new Tuple2(kv._1.toString(), vr);
    }
});

请注意 JavaSparkContext 中 sequenceFile 方法的 API 中的此注释:

注意:因为 Hadoop 的 RecordReader 类对每条记录重复使用相同的 Writable 对象,所以直接缓存返回的 RDD 会创建对同一对象的许多引用。如果您打算直接缓存 Hadoop 可写对象,则应首先使用映射函数复制它们。

于 2014-04-05T18:55:01.783 回答
1

Apache Spark中处理序列文件时,我们必须遵循以下技术:

-- 使用 Java 等效数据类型代替 Hadoop 数据类型。
 -- Spark 自动将 Writables 转换为 Java 等效类型。

例如:- 我们有一个序列文件“xyz”,这里的键类型是文本和值
是可长写的。当我们使用这个文件来创建一个 RDD 时,我们需要使用他们的
java等效数据类型,即分别为String和Long。

 val mydata = = sc.sequenceFile[String, Long]("path/to/xyz")
 我的数据收集

于 2016-12-16T19:17:07.407 回答
0

您的代码存在序列化问题的原因是您的 Kryo 设置虽然关闭,但并不完全正确:

改变:

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...

至:

val sparkConf = new SparkConf()
  // ... set master, appname, etc, then:
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")

val sc = new SparkContext(sparkConf)
于 2015-08-31T00:22:30.877 回答