2

我正在使用 quarkus 版本:1.5.2.Final以及以下依赖项以包含反应式消息传递(我在发送方和接收方项目之间有一个正在运行的 Artemis docker 实例):

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>

在发件人项目上,我@Outgoing('stock-quote')在一个方法上有一个注释,该方法每秒产生一个随机数并返回Flowable<Message<String>>. 配置如下:

mp.messaging.outgoing.stock-quote.connector=smallrye-amqp
mp.messaging.outgoing.stock-quote.address=stocks
mp.messaging.outgoing.stock-quote.durable=true

在receiver项目上,receiver所在的位置有一个带有注解@Incoming('stock)'.的方法配置如下:

mp.messaging.incoming.stocks.connector=smallrye-amqp
mp.messaging.incoming.stocks.durable=true

我注意到了什么?一切正常。当接收方项目由于某种原因宕机时,发送方仍在发送的消息会被持久化,这很好。但是,当接收器再次联机时,接收器不会收到持久消息,而是在单独的“客户端线程”上启动。持久化的消息只是留在那里。

我还尝试@Broadcast在发件人端添加注释。由于我还希望有多个接收器实例,因此消息被分配给接收器。然后,当接收者下线时,消息将再次被持久化。当接收器恢复时,它会收到正在发送的新消息,但持久消息只是留在那里(在另一个“客户端线程”中,更糟糕的是,这个“客户端线程”上的持久消息正在累积(由于广播)。

那我想要什么?我希望有一个发送消息的发件人。这些消息被划分为两个或多个接收器实例。如果一个接收器出现故障,则消息将由其他实例处理。如果所有的接收者都挂了,则消息正在被持久化,一旦有一个或多个接收者再次在线,这些持久化的消息就会被处理。所以基本上我想要这个:

在此处输入图像描述

任何人都知道如何结合 quarkus 反应消息和 artemis 来做到这一点?

4

0 回答 0