有一个消息到达的 Kafka 主题。我需要阅读一条消息,对其进行处理并继续处理下一条消息。消息处理可能会失败,如果发生这种情况,则必须重试处理几次(比如说 10 次),然后才能继续处理下一条消息。如果处理失败 10 次,则需要丢弃该消息,我们应该继续处理下一条消息。
我们使用reactor-kafka
,所有的处理都需要是反应式的。
这是我试图解决这个问题的方法:
Flux.defer(receiver::receive)
.concatMap(this::processRecord)
.retryBackoff(10, ofMillis(500))
.concatMap(record -> record.receiverOffset().commit())
.subscribe();
(这里receiver
是一个KafkaReceiver<String, String>
)。
这适用于没有任何异常的情况,如果有异常,processRecord()
则重试 10 次。这里的问题是,如果在 10 次允许的尝试后仍然失败,则偏移量不会被提交(当然),所以下次从 Kafka 读取相同的偏移量时,有效地,处理会永远卡在“错误”偏移量上.
我试图实现以下明显的想法:如果异常“传递得比retryBackoff()
操作员更远”,则提交当前偏移量。要提交偏移量,我们需要一个ReceiverRecord
,因此我将异常包装ExceptionWithRecord
与当前记录一起添加:
// in processRecord()
.onErrorMap(ex -> new ExceptionWithRecord(record, ex))
和
Flux.defer(receiver::receive)
.concatMap(this::processRecord)
.retryBackoff(10, ofMillis(500))
.concatMap(record -> record.receiverOffset().commit())
.onErrorResume(this::extractRecordAndMaybeCommit)
.subscribe();
extractRecordAndMaybeCommit()
从给定的异常中提取ReceiverRecord
并提交它:
return record.receiverOffset().commit();
如果重试次数用尽,这种传递记录并稍后提交记录的方法有效,并且该.commit()
方法被调用。但它没有效果。
事实证明,当任何异常进入上面的反应管道时,DefaultKafkaReceiver.dispose()
都会被调用,因此任何后续的提交尝试都会被忽略。因此事实证明,一旦发布者“看到”任何异常,就根本不可能提交偏移量。
如何在仍在使用的同时实现“在 N 个错误后提交”行为reactor-kafka
?