0

我有一个基于 C++ 的遗留系统,它输出支持融合 Avro 模式注册表格式的二进制编码 Avro 数据。在我的 Java 应用程序中,我使用 KafkaAvroDeserializer 类成功地反序列化了消息,但无法打印出消息。

private void consumeAvroData(){
    String group = "group1";
    Properties props = new Properties();
    props.put("bootstrap.servers", "http://1.2.3.4:9092");
    props.put("group.id", group);
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", LongDeserializer.class.getName());
    props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
   // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false");
    props.put("schema.registry.url","http://1.2.3.4:8081");
    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);

    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    System.out.println("Subscribed to topic " + TOPIC_NAME);

    while (true) {
        ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
        for (ConsumerRecord<String, GenericRecord> record : records)
        {
            System.out.printf("value = %s\n",record.value());
        }
    }
}

我得到的输出是

{"value":"�"}

为什么我无法打印反序列化数据?任何帮助表示赞赏!

4

1 回答 1

2

Confluent Avro Serializer 的线路格式记录在题为“线路格式”的部分中

http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

它是一个魔术字节(当前始终为 0),后跟 Schema Registry 返回的 4 字节 Schema ID,然后是一组字节,这些字节是 Avro 二进制编码中的 Avro 序列化数据。

如果您将消息读取为 ByteArray 并打印出前 5 个字节,您将知道这是否真的是 Confluent Avro 序列化消息。应该是 0 后跟 0001 或其他一些模式 ID,您可以检查它是否在本主题的模式注册表中。

如果不是这种格式,那么消息可能会以另一种方式序列化(没有 Confluent Schema Registry),您需要使用不同的反序列化器,或者可能从消息值中提取完整的 Schema,甚至需要从其他一些获取原始 Schema 文件能够解码的源。

于 2017-08-27T05:22:09.147 回答