3

我目前正在尝试基于 openliberty mpReactiveMessaging 功能实现 kafka 消息的消费者:https ://openliberty.io/blog/2019/09/13/microprofile-reactive-messaging.html

给定这样的方法:

@Incoming("greetings")
public CompletionStage <Void> consume(Message<String> greeting ){
   throw new RuntimeException("Something really bad happened.");
}

如何为这些类型的异常实现错误处理?默认行为是消费者在第一个异常时停止处理消息,这并不完全会导致稳定/可靠的实现。

对于https://kafka.apache.org/documentation/#sinkconnectconfigs有配置死信队列的属性:

  • 容错
  • errors.deadletterqueue.topic.name

但是这些对消费者不可用:https : //kafka.apache.org/documentation/#consumerconfigs 有没有办法使用响应式消息在 openliberty 中获得类似的工作?

4

0 回答 0