0

我有一个关于#protobuf #serialization 的问题,它发生在#nodejs 和#apache #kafka 中,由#confluent 平台在一个社区中运行。

我使用 google protobuf 或 protobufjs 序列化数据,然后使用 kafkajs 将其发送到 kafka。但是,当我提交数据时,kafka-protobuf-console-consumer 给了我一个序列化异常。请检查源代码并帮助我。 https://github.com/smhmayboudi/kafka-protobuf-console

org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 53
Caused by: java.lang.IllegalArgumentException: Invalid message indexes: io.confluent.kafka.schemaregistry.protobuf.MessageIndexes@59d77850
    at io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema.toMessageName(ProtobufSchema.java:903)
    at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:119)
    at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:98)
    at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter$ProtobufMessageDeserializer.deserialize(ProtobufMessageFormatter.java:130)
    at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter$ProtobufMessageDeserializer.deserialize(ProtobufMessageFormatter.java:104)
    at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter.writeTo(ProtobufMessageFormatter.java:88)
    at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:173)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:118)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
4

1 回答 1

0

在 Gerard Klijs 的帮助下,我通过在 confluent slack 中讨论找到了解决方案。感谢 Gerard Klijs 和 Ricardo Ferreira。另外我更新了 git repo。

java和其他语言之间的序列化器格式是不同的。所以你必须遵循这种格式化风格:[Magic Byte] + [Schema ID] + [Message Index Data] + [Message Payload],其中 Message Index Data 为零。

来源:https ://riferrei.com/2020/07/09/data-sharing-between-java-go-using-kafka-and-protobuf/

于 2020-11-17T12:09:32.980 回答