我目前正在尝试基于 openliberty mpReactiveMessaging 功能实现 kafka 消息的消费者:https ://openliberty.io/blog/2019/09/13/microprofile-reactive-messaging.html
给定这样的方法:
@Incoming("greetings")
public CompletionStage <Void> consume(Message<String> greeting ){
throw new RuntimeException("Something really bad happened.");
}
如何为这些类型的异常实现错误处理?默认行为是消费者在第一个异常时停止处理消息,这并不完全会导致稳定/可靠的实现。
对于https://kafka.apache.org/documentation/#sinkconnectconfigs有配置死信队列的属性:
- 容错
- errors.deadletterqueue.topic.name
但是这些对消费者不可用:https : //kafka.apache.org/documentation/#consumerconfigs 有没有办法使用响应式消息在 openliberty 中获得类似的工作?