0

我有一个基于 Spring-Boot 的小型原型,可以使用 Protobuf 将消息发布到 Kafka 集群。我正在使用融合序列化程序:

  • io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
  • io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer

我还从 Confluent(最新版本)运行 Schema Registry 来处理 Protobuf 模式。一切都按预期工作。

现在,我想介绍 Cloudevents 规范(https://github.com/cloudevents/spec),但我很难理解它如何与 Confluent Schema Registry 一起工作。

Cloudevents 有一个 sdk模块,可以将消息直接序列化到 Protobuf。消息的data部分是我的版本化有效负载应该去的地方,但是没有办法只为消息的一部分定义模式。为了更清楚:

 CloudEvent event = CloudEventBuilder.v1()
                .withId(UUID.randomUUID().toString())
                .withType("example.vertx")
                .withSource(URI.create("http://localhost"))
                .withData(???) <-- HERE IS WHERE MY PAYLOAD SHOULD BE VERSIONED
                .build();

一种解决方案是复制 Cloudevent protobuf架构,并在每个 protobuf 架构文件中简单地定义消息规范。这样做的缺点是我必须为每条新消息复制/粘贴 Cloudevents protobuf 模式。这将允许我在不使用任何 Cloudevent 库的情况下使用标准 Protobuf Kafka serde。有更好的解决方案吗?

4

1 回答 1

0

如果您使用的是 Kafka,您应该查看CloudEvents Kafka Protocol Spec,它将有自己的 Kafka Serializer 类

如果您阅读了该内容,它将引用 Binarydatacontenttype和类似的标头application/cloudevents+avro,可以以 . 为后缀+protobuf

如果我正确阅读了规范,Kafka 值本身“必须”是 JSON 格式,并且您的实际有效负载事件的数据可以是二进制编码的(我猜是 base64 字符串?因为 JSON 没有二进制类型)

基本上,发生的情况是您需要使用您提到的类手动序列化 Protobuf 事件,并与模式注册表进行通信。然后将其粘贴在 CloudEvent 记录中,最后使用一些“CloudEventSerializer”并生成...
然后在另一侧执行相反的操作;从值中提取data有效负载,并将其传递给KafkaProtobufDeserializer.deserialize方法。

于 2022-02-25T17:23:11.550 回答