4

我正在使用 java spark API 编写一些测试应用程序。我正在使用一个不扩展可序列化接口的类。因此,为了使应用程序正常工作,我使用 kryo 序列化程序来序列化类。但是我在调​​试时观察到的问题是,在反序列化过程中,返回的类对象变为空,进而引发空指针异常。似乎是关闭问题出现问题但不确定。由于我是这种序列化的新手,我不知道从哪里开始挖掘。

这是我正在测试的代码:

package org.apache.spark.examples;


import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;




/**
 * Spark application to test the Serialization issue in spark
 */
public class Test {

    static PrintWriter outputFileWriter;
    static FileWriter file;
    static JavaSparkContext ssc;

    public static void main(String[] args) {


        String inputFile = "/home/incubator-spark/examples/src/main/scala/org/apache/spark/examples/InputFile.txt";

        String master = "local";
        String jobName = "TestSerialization";
        String sparkHome = "/home/test/Spark_Installation/spark-0.7.0";
        String sparkJar = "/home/test/TestSerializationIssesInSpark/TestSparkSerIssueApp/target/TestSparkSerIssueApp-0.0.1-SNAPSHOT.jar";


        SparkConf conf = new SparkConf();
        conf.set("spark.closure.serializer","org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.kryo.registrator", "org.apache.spark.examples.MyRegistrator");
        // create the Spark context
        if(master.equals("local")){
            ssc = new JavaSparkContext("local", jobName,conf);
            //ssc = new JavaSparkContext("local", jobName);
        } else {
            ssc = new JavaSparkContext(master, jobName, sparkHome, sparkJar);
        }
        JavaRDD<String> testData = ssc.textFile(inputFile).cache();
        final NotSerializableJavaClass notSerializableTestObject= new NotSerializableJavaClass("Hi ");
        @SuppressWarnings({ "serial", "unchecked"})
        JavaRDD<String> classificationResults = testData.map(
                new Function<String, String>() {
                    @Override
                    public String call(String inputRecord) throws Exception {                   
                        if(!inputRecord.isEmpty()) {
                            //String[] pointDimensions = inputRecord.split(",");
                            String result = "";

                            try {
                                FileWriter file = new FileWriter("/home/test/TestSerializationIssesInSpark/results/test_result_" + (int) (Math.random() * 100));
                                PrintWriter outputFile = new PrintWriter(file); 
                                InetAddress ip;
                                ip = InetAddress.getLocalHost();
                                outputFile.println("IP of the server: " + ip);

                                result = notSerializableTestObject.testMethod(inputRecord);
                                outputFile.println("Result: " + result);

                                outputFile.flush();
                                outputFile.close();
                                file.close();

                            } catch (UnknownHostException e) {
                                e.printStackTrace();
                            }
                            catch (IOException e1) {
                                e1.printStackTrace();
                            } 

                            return result;
                        } else {
                            System.out.println("End of elements in the stream.");
                            String result = "End of elements in the input data";
                            return result;
                        }
                    }

                }).cache(); 

        long processedRecords = classificationResults.count();

        ssc.stop();
        System.out.println("sssssssssss"+processedRecords);
    }
}

这是 KryoRegistrator 类

package org.apache.spark.examples;

import org.apache.spark.serializer.KryoRegistrator;

import com.esotericsoftware.kryo.Kryo;

public class MyRegistrator implements KryoRegistrator {
    public void registerClasses(Kryo kryo) {
        kryo.register(NotSerializableJavaClass.class);
    }
}

这是我要序列化的类:

package org.apache.spark.examples;

public class NotSerializableJavaClass {
    public String testVariable;

    public NotSerializableJavaClass(String testVariable) {
        super();
        this.testVariable = testVariable;
    }

    public String testMethod(String vartoAppend){
        return this.testVariable + vartoAppend;
    }
}
4

1 回答 1

1

这是因为spark.closure.serializer只支持 Java 序列化程序。请参阅http://spark.apache.org/docs/latest/configuration.html关于spark.closure.serializer

于 2014-10-20T09:31:20.573 回答