我正在使用 Reactor-Kafka 1.2.4,目前面临的问题是在处理消息时遇到错误,Kafka 消费者停止并且不会继续处理其他消息。
我目前只想记录错误消息并继续处理下一条消息,稍后将添加 DLQ 处理等。
我已经将 Reactor-Kafka 升级到 1.3.0,另外,下面的代码是否足够好?
KafkaReceiver
.receive()
.retry(3)
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.immediate())
.subscribe(this::processRecord);
private void processRecord(ReceiverRecord<String, M> record) {
try {
<Process record>
} catch (Exception e) {
log.error("Error thrown {} while consuming message", e.getMessage());
<Later on Publish to Dead letter Queue>
} finally {
record.receiverOffset().acknowledge();
log.debug("Successfully acknowledged kafka message");
}
}
谢谢你。