1

我正在使用 Reactor-Kafka 1.2.4,目前面临的问题是在处理消息时遇到错误,Kafka 消费者停止并且不会继续处理其他消息。

我目前只想记录错误消息并继续处理下一条消息,稍后将添加 DLQ 处理等。

我已经将 Reactor-Kafka 升级到 1.3.0,另外,下面的代码是否足够好?

 KafkaReceiver
              .receive()
              .retry(3)
              .subscribeOn(Schedulers.single())
              .publishOn(Schedulers.immediate())
              .subscribe(this::processRecord);
              
 private void processRecord(ReceiverRecord<String, M> record) {
      try {
        <Process record>
      } catch (Exception e) {
        log.error("Error thrown {} while consuming message", e.getMessage());
        <Later on Publish to Dead letter Queue>
      } finally {
        record.receiverOffset().acknowledge();
        log.debug("Successfully acknowledged kafka message");
      }
  } 

谢谢你。

4

0 回答 0