2

我正在使用以下代码(不是真的,但让我们假设它)来创建一个模式并由生产者将其发送给 kafka。

public static final String USER_SCHEMA = "{"
        + "\"type\":\"record\","
        + "\"name\":\"myrecord\","
        + "\"fields\":["
        + "  { \"name\":\"str1\", \"type\":\"string\" },"
        + "  { \"name\":\"str2\", \"type\":\"string\" },"
        + "  { \"name\":\"int1\", \"type\":\"int\" }"
        + "]}";

public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);

        Thread.sleep(250);

    }

    producer.close();
}

问题是代码只允许我使用此模式发送 1 条消息。然后我需要更改模式名称以发送下一条消息......所以名称字符串现在是随机生成的,所以我可以发送更多消息。这是一个黑客,所以我想知道正确的方法来做到这一点。

我还研究了如何在没有架构的情况下发送消息(即,已经向 kafka 发送了 1 条带有架构的消息,现在所有其他消息都不再需要架构了)——但new GenericData.Record(..)需要一个架构参数。如果它为空,它会抛出一个错误。

那么将avro模式消息发送到kafka的正确方法是什么?

这是另一个代码示例 - 与我的非常相似:
https ://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io /confluent/examples/producer/ProducerExample.java

它也没有显示如何在不设置模式的情况下发送。

4

1 回答 1

1

我不明白这条线:

问题是代码只允许我使用此模式发送 1 条消息。然后我需要更改架构名称才能发送下一条消息。

在这两个示例中,您提供的示例和您提供的融合示例中,模式都不会发送到 Kafka。

在您提供的示例中,用于创建 GenericRecord 对象的架构。您提供架构,因为您想针对某个架构验证记录(例如,验证您只能将整数 int1 字段放入 GenericRecord 对象中)。

在您的代码中,唯一的区别是您决定将数据序列化为 byte[],这可能不需要,因为您可以将此责任委托给 KafkaAvroSerializer,正如您在 confluent 示例中所见。

GenericRecord 是一个 Avro 对象,它不是 Kafka 强制执行的。如果您想将任何类型的对象发送到 Kafka(带有或不带有模式),您只需要创建(或使用现有的)序列化程序,它将您的对象转换为 byte[] 并在您为生产者。

通常,使用 Avro 消息本身发送指向模式的指针是一个好习惯。您可以在以下链接中找到原因:http: //www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/

于 2016-09-20T23:21:47.803 回答