1

I'm running a spark job to generate HFiles for my HBase data store.

It used to be working fine with my Cloudera cluster, but when we switched to EMR cluster, it fails with following stacktrace:

Serialization stack:
    - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 50 31 36 31 32 37 30 33 34 5f 49 36 35 38 34 31 35 38 35); not retrying


Serialization stack:
    - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 50 31 36 31 32 37 30 33 34 5f 49 36 35 38 34 31 35 38 35)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1505)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1493)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1492)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1492)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1720)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1675)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1664)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:629)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1158)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1085)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1005)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:996)
    at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopFile(JavaPairRDD.scala:823)

My questions:

  1. What could cause the difference between the two runs? Version difference between the two clusters?
  2. I did research and found this post: then I added the Kyro parameters into my spark-submit command, now my command looks like below: spark-submit --conf spark.kryo.classesToRegister=org.apache.hadoop.hbase.io.ImmutableBytesWritable,org.apache.hadoop.hbase.KeyValue --master yarn --deploy-mode client --driver-memory 16G --executor-memory 18G ... but still, I got the same error.

Here's my Java code:

protected void generateHFilesUsingSpark(JavaRDD<Row> rdd) throws Exception {
        JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRdd = rdd.mapToPair(
            new PairFunction<Row, ImmutableBytesWritable, KeyValue>() {
                public Tuple2<ImmutableBytesWritable, KeyValue> call(Row row) throws Exception {
                    String key = (String) row.get(0);
                    String value = (String) row.get(1);

                    ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
                    byte[] rowKeyBytes = Bytes.toBytes(key);
                    rowKey.set(rowKeyBytes);

                    KeyValue keyValue = new KeyValue(rowKeyBytes,
                        Bytes.toBytes("COL"),
                        Bytes.toBytes("FM"),
                        ProductJoin.newBuilder()
                            .setId(key)
                            .setSolrJson(value)
                            .build().toByteArray());

                    return new Tuple2<ImmutableBytesWritable, KeyValue>(rowKey, keyValue);
                }
            });

        Configuration baseConf = HBaseConfiguration.create();
        Configuration conf = new Configuration();
        conf.set(HBASE_ZOOKEEPER_QUORUM, "xxx.xxx.xx.xx");
        Job job = new Job(baseConf, "APP-NAME");
        HTable table = new HTable(conf, "hbaseTargetTable");
        Partitioner partitioner = new IntPartitioner(importerParams.shards);
        JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
            javaPairRdd.repartitionAndSortWithinPartitions(partitioner);
        HFileOutputFormat2.configureIncrementalLoad(job, table);
        System.out.println("Done configuring incremental load....");

        Configuration config = job.getConfiguration();

        repartitionedRdd.saveAsNewAPIHadoopFile(
            "hfilesOutputPath",
            ImmutableBytesWritable.class,
            KeyValue.class,
            HFileOutputFormat2.class,
            config
        );
        System.out.println("Saved to HFiles to: " + importerParams.hfilesOutputPath);
}
4

1 回答 1

0

好的,问题解决了,诀窍是使用 KyroSerializer,我在我的 Java 代码中添加了这个来注册 ImmutableBytesWritable。

        SparkSession.Builder builder = SparkSession.builder().appName("AWESOME");
        builder.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        SparkConf conf = new SparkConf().setAppName("AWESOME");
        Class<?>[] classes = new Class[]{org.apache.hadoop.hbase.io.ImmutableBytesWritable.class};
        conf.registerKryoClasses(classes);
        builder.config(conf);
        SparkSession spark = builder.getOrCreate();
于 2017-06-21T08:58:32.753 回答