我正在使用以下代码(不是真的,但让我们假设它)来创建一个模式并由生产者将其发送给 kafka。
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
问题是代码只允许我使用此模式发送 1 条消息。然后我需要更改模式名称以发送下一条消息......所以名称字符串现在是随机生成的,所以我可以发送更多消息。这是一个黑客,所以我想知道正确的方法来做到这一点。
我还研究了如何在没有架构的情况下发送消息(即,已经向 kafka 发送了 1 条带有架构的消息,现在所有其他消息都不再需要架构了)——但new GenericData.Record(..)
需要一个架构参数。如果它为空,它会抛出一个错误。
那么将avro模式消息发送到kafka的正确方法是什么?
这是另一个代码示例 - 与我的非常相似:
https ://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io /confluent/examples/producer/ProducerExample.java
它也没有显示如何在不设置模式的情况下发送。