1

我试图弄清楚如何处理带有错误 avro 消息的异常。我目前正在

{"@timestamp":"2019-06-13T20:20:38.636+00:00","@version":1,"message":"[Incoming aggregation] Upstream failed.","logger_name":"akka.stream.Materializer","thread_name":"system-akka.actor.default-dispatcher-5","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition conversation-7 at offset 1737997. If needed, please seek past the record to continue consumption.\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 42\nCaused by: java.lang.ArrayIndexOutOfBoundsException: 51

可以看出,这打破了流。我无法在决策者中处理此问题,因为它是消费者来源的一部分。在文档中,它说我应该将流作为原始字节读取,并在处理链中进一步的 Flow 阶段手动进行解析。但是,如果我使用 Schema 注册表,我认为这是不可能的。

有人可以提示我处理此问题的正确方法是什么吗?

谢谢

4

0 回答 0