0

我的 Quarkus 微服务使用 smallrye 反应式消息传递库中的 AMQP 连接器向从vromero/activemq-artemis:2.16.0-alpineDocker 映像运行的 ActiveMQ Artemis 代理生成消息。反应式消息传递库文档提到了使用动态地址名称的可能性。我在我的 REST 资源中使用以下 (Kotlin) 代码:

    @Inject
    @Channel("task-finished")
    lateinit var taskFinishedEmitter: MutinyEmitter<String>

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    fun doSomethingAndInform(@RestForm customerId: String): Uni<String> {

       // leaving out the actual messageText computation...

       val messageText: String = "DUMMY MESSAGE"
       val metadata: OutgoingAmqpMetadata = OutgoingAmqpMetadata.builder()
          .withDurable(true)
          .withCorrelationId(customerId)
          .withAddress("anycast://my-custom-address")
          .build()

       val message: Message<String> = Message.of(messageText,
            {
                logger.info("message acked")
                CompletableFuture.completedFuture(null)
            },
            {
                logger.info("message nacked: {}", it.message)
                CompletableFuture.completedFuture(null)
            }
       )

       taskFinishedEmitter.send(message.addMetadata(metadata))
       return Uni.createFrom().item("DONE")
    }

连接器定义在application.properties

amqp-host=localhost
amqp-port=5672
amqp-username=adm
amqp-password=***

mp.messaging.outgoing.task-finished.connector=smallrye-amqp

ActiveMQ Artemis 确实my-custom-address动态地创建了地址,但是它没有创建任何绑定到它的队列,并且消息最终被取消路由

配置broker.xml文件包含在core部分


<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
<address-setting match="#">
   <auto-create-queues>true</auto-create-queues>
</address-settings>

我尝试将队列名称与地址一起传递

.withAddress("anycast://my-custom-address::my-queue")

但这没有任何区别。

以编程方式创建的队列和传递给它的消息缺少什么?另外,为什么 Artemis 在消息丢失(未路由)时会确认消息?

更新:附上来自 Artemis 网络界面的屏幕截图 在此处输入图像描述

4

1 回答 1

1

默认情况下,ActiveMQ Artemis 会将 AMQP 客户端发送的消息视为多播(即发布/订阅)。多播的语义规定,发布到代理的每条消息都将分派给每个订阅者。然而,由于没有订阅者,消息被简单地丢弃(即未路由)。

由于您要为地址添加前缀,因此您应该在使用参数anycast://时在“amqp”上配置该前缀,例如:acceptorbroker.xmlanycastPrefix

<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>

您还可以更改用于自动创建资源的默认路由类型,例如:

<address-setting match="my-custom-address">
   <default-address-routing-type>ANYCAST</default-address-routing-type>
   <default-queue-routing-type>ANYCAST</default-queue-routing-type>
</address-settings>
于 2021-03-01T00:08:37.340 回答