我有一个 User 类,我将它序列化为 avro,(使用 Confluent avro 序列化程序和模式注册表)并将其发布到 Kafka 主题。我让消费者将数据打印到控制台,它工作正常。我现在尝试的是从这些数据中创建原始对象。例如,我将“用户”对象作为 avro 发布到 Kafka 主题。我正在尝试在使用后重新创建该用户对象(而不是控制台输出)。这可能吗?
下面是我的代码
用户类
public class User {
int id;
String name;
public User(){}
public User(int id, String name) {
super();
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
消费者代码
User user = new User();
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "avro-consumer-group");
props.put("key.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put("value.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put("schema.registry.url","http://127.0.0.1:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList("avrotesttopic"));
System.out.println("Subscribed to topic " + "avrotesttopic");
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (org.apache.kafka.clients.consumer.ConsumerRecord<String, GenericRecord> record : records){
System.out.printf("value = %sn",record.value());
//output-> value = {"id": 10, "name": "testName"}
}
}
谢谢