在尝试使用 Kafka Streams 流式传输 Avro 数据时,我遇到了这个错误:
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
尽管我在邮件列表中找到了几个关于它的旧线程,但没有一个解决方案说明了该问题。所以希望我能在这里找到解决方案。
我的设置如下所示:
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)
我已经尝试将 设置KEY_SERDE
为与 相同VALUE_SERDE
,但即使这在邮件列表中被“标记”为解决方案,它在我的情况下也不起作用。
我正在GenericData.Record
使用我的 Schema 生成如下:
val record = new GenericData.Record(schema)
...
record.put(field, value)
当我启动调试模式并检查生成的记录时,一切看起来都很好,记录中有数据并且映射正确。
我像这样流式传输 KStream(我之前使用过分支):
splitTopics.get(0).to(s"${destTopic}_Testing")
我是GenericData.Record
用来记录的。这可能是与 结合使用的问题GenericAvroSerde
吗?