1

我正在尝试使用 Confluent Kafka REST 代理从我的一个主题中检索 Avro 格式的数据,但不幸的是我收到了反序列化错误。我正在使用以下命令查询 Kafka REST 代理

 curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" 
http://localhost:8082/consumers/my-group/instances/my-consumer/records?timeout=30000

我得到了回应

{
  "error_code": 50002,
  "message": "Kafka error: Error deserializing key/value for partition input-0 at offset 0. If needed, please seek past the record to continue consumption."
}

Kafka Rest Proxy 服务器上的日志是:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition input-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

数据是使用 KafkaAvroSerializer 生成的,并且模式存在于模式注册表中。另请注意,通过在 CLI 上使用 avro-console-consumer 可以读取数据。

有谁知道如何解决这个问题?

4

2 回答 2

0

直到最近才支持在 rest 代理中包含字符串键和 AVRO 值:

https://github.com/confluentinc/kafka-rest/issues/210

所以最近代码已被合并,但问题仍然存在,文档尚未完全更新:

https://github.com/confluentinc/kafka-rest/pull/797

于 2021-06-15T20:34:01.120 回答
0

很可能除了关于该主题的有效 Avro 消息外,您也有无效消息。这就是此错误的含义,并且正是我尝试使用 REST 代理在本地使用非 Avro 消息时遇到的错误:

ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@2e20d4f3  (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition avrotest-0 at offset 2. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我会使用一种工具kafkacat来检查错误中给出的偏移量处的实际消息,例如:

kafkacat -C -b localhost:9092 -t test_topic_avro -o 0 -c 1

-o 0在偏移量 0 处使用消息,-c 1意味着只使用一条消息。

您还可以寻找有问题的偏移量,例如对于主题avrotest将偏移量移动到1

echo '{ "offsets": [ { "topic": "avrotest", "partition": 0, "offset": 1 } ] }' | \
http POST localhost:8082/consumers/rmoff_consumer_group/instances/rmoff_consumer_instance/positions \
Content-Type:application/vnd.kafka.v2+json
于 2019-03-08T16:19:03.907 回答