3

我有一个 User 类,我将它序列化为 avro,(使用 Confluent avro 序列化程序和模式注册表)并将其发布到 Kafka 主题。我让消费者​​将数据打印到控制台,它工作正常。我现在尝试的是从这些数据中创建原始对象。例如,我将“用户”对象作为 avro 发布到 Kafka 主题。我正在尝试在使用后重新创建该用户对象(而不是控制台输出)。这可能吗?

下面是我的代码

用户类

public class User { 
    int id;
    String name;    
    public User(){} 
    public User(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }   
}

消费者代码

User user = new User();



Properties props = new Properties();

props.put("bootstrap.servers", "127.0.0.1:9092");

props.put("group.id", "avro-consumer-group");

props.put("key.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

props.put("value.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

props.put("schema.registry.url","http://127.0.0.1:8081");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);



consumer.subscribe(Arrays.asList("avrotesttopic"));

System.out.println("Subscribed to topic " + "avrotesttopic");



while (true) {

    ConsumerRecords<String, GenericRecord> records = consumer.poll(100);

    for (org.apache.kafka.clients.consumer.ConsumerRecord<String, GenericRecord> record : records){

        System.out.printf("value = %sn",record.value());    

        //output->  value = {"id": 10, "name": "testName"}

    }

}

谢谢

4

3 回答 3

4

考虑到您正在使用 KafkaAvroDeserializer,您需要将以下属性设置为您的使用者配置的一部分

    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

它将允许您获得SpecificRecord而不是GenericRecord已经由消费者处理。这是一个例子

https://dzone.com/articles/kafka-avro-serialization-and-the-schema-registry

于 2018-05-04T14:36:24.680 回答
0

您可以将 AvroSchema 转换为 JsonSchema,然后您可以从 JsonSchema 创建一个 POJO 类。

将 AVRO 转换为 JSON:

static void convertAvroToJson(InputStream inputStream, OutputStream outputStream, Schema schema)
        throws IOException {
    DatumReader<Object> reader = new GenericDatumReader<>(schema);
    DatumWriter<Object> writer = new GenericDatumWriter<>(schema);

    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);

    JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, outputStream, true);
    Object datum = null;
    while (!binaryDecoder.isEnd()) {
        datum = reader.read(datum, binaryDecoder);
        writer.write(datum, jsonEncoder);
        jsonEncoder.flush();
    }
    outputStream.flush();
}

然后将 JSONSchema 转换为 POJO ,

https://dzone.com/articles/converting-json-to-pojos-using-java

于 2018-05-04T09:18:04.737 回答
0

这取决于您如何将输入记录序列化到 Avro。

简短的回答,如果您的序列化没问题,那么您必须在接收记录时进行以下更改

ConsumerRecords<String, User> records = consumer.poll(100);

有关完整示例,请参阅

于 2018-05-04T12:47:20.947 回答