尝试实现一个重试记录处理器,该处理器在发生瞬时故障时回溯到最后一条记录的偏移量,而不是在不确定的时间内进入内部重试循环,并且如果错误持续足够长的时间,则冒着消费者会话超时和重新平衡的风险。
kafkaTemplate
.receive()
...
.concatMap(record -> {
var postResponses = mapper.transform(record.value())
.stream()
.map(this::post)
.toArray(Mono[]::new);
return Mono.when(postResponses)
.doOnSuccess(__ -> record.receiverOffset().acknowledge())
.onErrorResume(ex ->
kafkaTemplate.seek(record.receiverOffset().topicPartition(), record.offset())
.then(Mono.delay(someRetryinterval).then())
);
}, 0) // no prefetch!
.subscribe();
将最大轮询记录设置为 1,在此代码开始之前,一个主题中有两条记录,Mono。当一直失败时,我看到它反复寻找 0 和 1,而不是只读取第一条记录,直到它成功。
我错过了什么吗?如何在不切换到阻塞 API 的情况下完全序列化 poll 并寻求调用以处理记录,并根据需要进行尽可能多的重试?
期望具有 SeekToCurrentErrorHandler 的 @KafkaListener 的等效项也可以在 RK 中工作,但发现并非如此