26

我是 Kafka 和 Avro 的菜鸟。所以我一直试图让生产者/消费者运行。到目前为止,我已经能够使用以下内容生成和使用简单的字节和字符串: 生产者的配置:

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);
        Thread.sleep(250);
    }
    producer.close();
}

现在这一切都很好,当我尝试序列化 POJO 时问题就来了。因此,我能够使用 Avro 提供的实用程序从 POJO 获取 AvroSchema。对模式进行硬编码,然后尝试创建一个通用记录以通过 KafkaProducer 发送,生产者现在设置为:

    Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

这就是问题所在:当我使用 KafkaAvroSerializer 时,生产者没有出现,原因是: 缺少强制参数:schema.registry.url

我阅读了为什么需要这样做,以便我的消费者能够破译生产者发送给我的任何内容。但是模式不是已经嵌入到 AvroMessage 中了吗?如果有人可以分享一个使用 KafkaProducer 和 KafkaAvroSerializer 而无需指定 schema.registry.url 的工作示例,那就太好了

也非常感谢有关模式注册表实用程序的任何见解/资源。

谢谢!

4

5 回答 5

41

首先注意:KafkaAvroSerializervanilla apache kafka 中不提供 - 它由 Confluent Platform 提供。( https://www.confluent.io/ ),作为其开源组件的一部分 ( http://docs.confluent.io/current/platform.html#confluent-schema-registry )

快速回答:不,如果您使用KafkaAvroSerializer,您将需要一个模式注册表。在此处查看一些示例:http: //docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

架构注册表的基本思想是每个主题都将引用一个 avro 架构(即,您只能发送彼此一致的数据。但是架构可以有多个版本,因此您仍然需要为每个主题识别架构记录)

我们不想像您暗示的那样为每个数据编写架构 - 通常,架构比您的数据大!那会浪费每次读取时解析它的时间,并且浪费资源(网络,磁盘,cpu)

相反,模式注册表实例将进行绑定avro schema <-> int schemaId,然后序列化程序将在数据之前仅写入此 id,在从注册表获取它之后(并将其缓存以供以后使用)。

所以在 kafka 内部,你的记录将是[<id> <bytesavro>](以及出于技术原因的魔术字节),这只有 5 个字节的开销(与你的模式的大小相比)并且在读取时,你的消费者会找到与 id 对应的模式,和有关它的反序列化器 avro 字节。您可以在 confluent doc 中找到更多方法

如果你真的有一个用途,你想为每条记录编写模式,你将需要一个其他序列化程序(我认为自己编写,但这很容易,只需重用https://github.com/confluentinc/schema- registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java并删除模式注册表部分以将其替换为模式,读取相同)。但是,如果您使用 avro,我真的不鼓励这样做 - 一天后,您将需要实现类似 avro 注册表之类的东西来管理版本控制

于 2017-08-11T13:56:31.907 回答
4

虽然检查的答案都是正确的,但还应该提到架构注册可以禁用

只需设置auto.register.schemasfalse.

于 2018-09-30T08:03:39.053 回答
2

您可以创建自定义 Avro 序列化程序,然后即使没有 Schema 注册表,您也可以生成主题记录。检查下面的文章。

https://codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html

在这里,他们使用了 Kafkatemplate。我试过使用

KafkaProducer<String, User> UserKafkaProducer

它工作正常但是如果你想使用KafkaAvroSerialiser,你需要给 Schema registryURL

于 2020-06-12T17:10:19.447 回答
2

正如其他人指出的那样,KafkaAvroSerializer 需要 Schema Registry,它是 Confluent 平台的一部分,并且使用需要许可。

使用模式注册表的主要优点是在线上的字节会更小,而不是为每条消息编写带有模式的二进制有效负载。

我写了一篇博文,详细介绍了这些优点

于 2020-07-06T03:33:32.940 回答
0

您始终可以手动实现您的值类Serialiser<T>Deserialiser<T>以及Serde<T>Kafka Streams)。Java 类通常是从 Avro 文件生成的,因此直接编辑它不是一个好主意,但是包装可能很冗长,但可能的方式。

另一种方法是调整用于 Java 类生成的 Arvo 生成器模板并自动生成所有这些接口的实现。Avro maven 和 gradle 插件都支持自定义模板,所以应该很容易配置。

我创建了https://github.com/artemyarulin/avro-kafka-deserializable更改了模板文件和可用于文件生成的简单 CLI 工具

于 2018-11-07T06:54:01.417 回答