我的 Quarkus 微服务使用 smallrye 反应式消息传递库中的 AMQP 连接器向从vromero/activemq-artemis:2.16.0-alpine
Docker 映像运行的 ActiveMQ Artemis 代理生成消息。反应式消息传递库文档提到了使用动态地址名称的可能性。我在我的 REST 资源中使用以下 (Kotlin) 代码:
@Inject
@Channel("task-finished")
lateinit var taskFinishedEmitter: MutinyEmitter<String>
@POST
@Produces(MediaType.TEXT_PLAIN)
fun doSomethingAndInform(@RestForm customerId: String): Uni<String> {
// leaving out the actual messageText computation...
val messageText: String = "DUMMY MESSAGE"
val metadata: OutgoingAmqpMetadata = OutgoingAmqpMetadata.builder()
.withDurable(true)
.withCorrelationId(customerId)
.withAddress("anycast://my-custom-address")
.build()
val message: Message<String> = Message.of(messageText,
{
logger.info("message acked")
CompletableFuture.completedFuture(null)
},
{
logger.info("message nacked: {}", it.message)
CompletableFuture.completedFuture(null)
}
)
taskFinishedEmitter.send(message.addMetadata(metadata))
return Uni.createFrom().item("DONE")
}
连接器定义在application.properties
:
amqp-host=localhost
amqp-port=5672
amqp-username=adm
amqp-password=***
mp.messaging.outgoing.task-finished.connector=smallrye-amqp
ActiveMQ Artemis 确实my-custom-address
动态地创建了地址,但是它没有创建任何绑定到它的队列,并且消息最终被取消路由。
配置broker.xml
文件包含在core
部分
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
<address-setting match="#">
<auto-create-queues>true</auto-create-queues>
</address-settings>
我尝试将队列名称与地址一起传递
.withAddress("anycast://my-custom-address::my-queue")
但这没有任何区别。
以编程方式创建的队列和传递给它的消息缺少什么?另外,为什么 Artemis 在消息丢失(未路由)时会确认消息?