4

我浏览了 spring-cloud-stream 1.0.0.RELEASE 的文档,似乎找不到任何有关错误处理的文档。

根据对 kafka 0.9 的观察,如果我的消费者抛出 RuntimeException,我会看到 3 次重试。3 次重试后,我在日志中看到:

2016-05-17 09:35:59.216 ERROR 8983 --- [  kafka-binder-] o.s.i.k.listener.LoggingErrorHandler     : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]]

org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message

此时,消费者偏移量滞后 1,如果我重新启动消费者,则再次重试消息 3 次。但是,如果我随后将另一条消息发送到同一分区以使消费者不引发异常,则消费者偏移量将被更新,并且我们为其引发异常的原始消息在重新启动后将不再重试。

这是否记录在我没有找到的地方?错误处理是特定于活页夹的,还是 scs 将其抽象为在活页夹之间保持一致?我怀疑这是消费者偏移量如何使用 kafka binder 更新的意外结果。我看到添加了 enableDlq kafka 消费者属性,我即将对其进行测试,但我不确定我们如何处理 kafka 中的死信。我对 rabbitmq 中的死信队列很熟悉,但是使用 rabbitmq,我们可以使用 rabbitmq shovel 插件来重新发布和重试 dlq 消息,以涵盖由于临时服务中断导致失败的情况。除了自己编写类似的实用程序外,我不知道 kafka 有任何类似的功能。

更新:启用 enableDlq kafka 消费者属性的测试显示与错误处理相同的消费者偏移量问题。当消费者抛出 RuntimeException 时,我看到 3 次重试,之后未记录错误消息,并且看到一条消息已error.<destination>.<group>按记录发布到,但消费者偏移量未更新并滞后 1。如果我重新启动消费者,它尝试再次处理来自原始主题分区的相同失败消息,重试 3 次并再次将相同消息放在error.<destination>.<group>主题上(重复 dlq 消息)。如果我将另一条消息发布到消费者未引发 RuntimeException 的同一主题分区,则偏移量将更新,并且在重新启动时不再重试原始失败消息。

我认为无论 enableDlq 是否为真,当消费者抛出错误时,消费者都应该更新 kafka 中的消费者偏移量。这至少可以使所有重试失败的消息被丢弃(当 enableDlq 为 false 时)或发布到 dlq 并且永远不会重试(当 enableDlq 为 true 时)。

4

1 回答 1

2

对我来说看起来像一个错误 - 侦听器容器有一个属性autoCommitOnErrorfalse默认情况下),它没有被活页夹公开(或设置)。如果布尔值为真,则在调用错误处理程序(发布错误)后,提交偏移量。

请在github中将其报告为问题。

于 2016-05-17T17:25:26.103 回答