我目前正在研究 Quarkus 中的 Smallrye Reactive Messaging 集成。乍一看,发送和接收消息非常简单和优雅。
但是我没有发现的一件事是:如何处理重新发送消息?
示例:我们收到一条消息并尝试处理它。发生了一些异常(可能是数据库不可用或乐观锁异常等)。在这种情况下,我会抛出一个异常,这样消息就不会被确认。但目前我看不到消息是如何重新传递的。
我建立了一个小的虚拟项目来测试这个:
- 夸库斯
- ActiveMQ 阿尔忒弥斯
- 将消息(通过 Artemis 控制台)发送到队列
中——队列配置为 max redelivery = 3 - 使用 Quarkus / Smallrye Reactive Messaging @Incoming 注解接收消息
- @Incoming 方法抛出异常
--> 消息从 Artemis 队列中移除
--> @Incoming 方法只被调用一次
如果我关闭 Quarkus 应用程序,可以在 Artemis 队列中再次看到该消息,并将重新传递标志设置为 true。
但是我找不到如何在 Smallrye 反应式消息传递中管理/配置重新传递,以便该层处理消息的重新传递 n 次,并在最大重试次数后将消息放入 DLQ。
有没有办法做到这一点?