1

我一直在学习使用火花流从 kafka 生成和使用消息的教程。这个想法是产生以 avro 格式序列化的简单消息。反序列化来自 avro 格式的消息并使用 spark 流式处理。由于双射 api 引发 Failed to Invert 异常,我无法使用该消息。

制片人:

public static final String schema = "{"
+"\"fields\": ["
+   " { \"name\": \"str1\", \"type\": \"string\" },"
+   " { \"name\": \"str2\", \"type\": \"string\" },"
+   " { \"name\": \"int1\", \"type\": \"int\" }"
+"],"
+"\"name\": \"myrecord\","
+"\"type\": \"record\""
+"}"; 

public static void startAvroProducer() throws InterruptedException{
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(AvroProducer.schema);

    Injection<GenericRecord, byte[]> inject = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String,byte[]> producer = new KafkaProducer<String,byte[]>(props);
    for(int i=0;i<1000;i++){
        GenericData.Record record = new GenericData.Record(schema);
        record.put("str1", "str1-"+i);
        record.put("str2", "str2-"+i);
        record.put("int1", i);

        byte[] bytes = inject.apply(record);

        ProducerRecord<String,byte[]> producerRec = new ProducerRecord<String, byte[]>("jason", bytes);
        producer.send(producerRec);
        Thread.sleep(250);

    }

    producer.close();
}

消费者:

 private static SparkConf sc = null;
        private static JavaSparkContext jsc = null;
        private static JavaStreamingContext jssc = null;
        private static Injection<GenericRecord,byte[]> inject = null;

        static {
            Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(AvroProducer.schema);
            inject = GenericAvroCodecs.apply(schema);
        }

        public static void startAvroConsumer() throws InterruptedException {
            sc = new SparkConf().setAppName("Spark Avro Streaming Consumer")
                    .setMaster("local[*]");
            jsc = new JavaSparkContext(sc);
            jssc = new JavaStreamingContext(jsc, new Duration(200));

            Set<String> topics = Collections.singleton("jason");
            Map<String, String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("metadata.broker.list", "localhost:9092");
            JavaPairInputDStream<String, byte[]> inputDstream = KafkaUtils
                    .createDirectStream(jssc, String.class, byte[].class,
                            StringDecoder.class, DefaultDecoder.class, kafkaParams,
                            topics);

            inputDstream.map(message -> inject.invert(message._2).get()).foreachRDD(rdd -> {
                    rdd.foreach(record -> {
                        System.out.println(record.get("str1"));
                        System.out.println(record.get("str2"));
                        System.out.println(record.get("int1"));
                    });
                });

            jssc.start();
            jssc.awaitTermination();
        }

例外:

com.twitter.bijection.InversionFailure: Failed to invert: [B@3679b3f6
    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43)
    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at scala.util.Failure.recoverWith(Try.scala:203)
    at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
    at com.twitter.bijection.avro.GenericAvroCodec.invert(AvroCodecs.scala:293)
    at com.twitter.bijection.avro.GenericAvroCodec.invert(AvroCodecs.scala:276)
    at com.applications.streaming.consumers.AvroConsumer.lambda$0(AvroConsumer.java:54)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Not a data file.
    at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:105)
    at org.apache.avro.file.DataFileStream.<init>(DataFileStream.java:84)
    at com.twitter.bijection.avro.GenericAvroCodec$$anonfun$invert$2.apply(AvroCodecs.scala:295)
    at com.twitter.bijection.avro.GenericAvroCodec$$anonfun$invert$2.apply(AvroCodecs.scala:293)
    at com.twitter.bijection.Inversion$$anonfun$attempt$1.apply(Inversion.scala:32)
    at scala.util.Try$.apply(Try.scala:192)
    ... 18 more
4

0 回答 0