1

我在 Quarkus 上有一个应用程序正在接收 AmqpMessages,并将它们发送到另一个主题。

我不断收到来自 smallrye 的错误消息,说消息被拒绝。

这里是属性

mp.messaging.incoming.data.address=incoming
mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=false

mp.messaging.outgoing.position.address=outgoing
mp.messaging.outgoing.position.connector=smallrye-amqp
mp.messaging.outgoing.position.host=localhost
mp.messaging.outgoing.position.port=5672
mp.messaging.outgoing.position.durable=false

班级本身

    @Incoming("data")
    @Outgoing("position")
    public CompletionStage handleMessage(final String topic, final MessagingMessage messageToProcess) {
        final String message = messageToProcess.getMessageString();
        final String tenant = messageToProcess.getTenant();
        final String Id = messageToProcess.Id();
        final Message message = _gson.fromJson(message, Message.class);
        return _service.getStuff(tenant, id)
                .thenApply(stuff -> calculate(message, thing))
                .thenApply(Data -> buildAmqpMessage(tenant, id, message, Data))
                .exceptionally(ex -> {
                    _logger.errorv("Error handling message: {0} ", ex);
                    return null;
                });
    }

    public AmqpMessage buildAmqpMessage(final String tenant, final String id,
                                        final Message message, final Data data) {
        final OpenMessage messageToSend = buildMessage(message, openClosePercentageData);
        return OutgoingAmqpMessage.builder()
                .withSubject(_gson.toJson(messageToSend))
                .build();
    }

日志输出:

2020-05-10 20:35:36,376 DEBUG [io.sma.rea.mes.amq.AmqpConnector] (vert.x-eventloop-thread-1) Sending AMQP message to address `outgoing` 
2020-05-10 20:35:36,377 FINEST [io.ver.pro.imp.ProtonTransport] (vert.x-eventloop-thread-1) New Proton Event: LINK_FLOW
2020-05-10 20:35:36,523 FINE  [pro.trace] (vert.x-eventloop-thread-1) IN: CH[0] : Flow{nextIncomingId=2, incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647, handle=0, deliveryCount=1, linkCredit=250, available=null, drain=false, echo=false, properties=null}
2020-05-10 20:35:36,523 FINE  [pro.trace] (vert.x-eventloop-thread-1) IN: CH[0] : Disposition{role=RECEIVER, first=0, last=null, settled=true, state=Rejected{error=Error{condition=amqp:not-found, description='Deliveries cannot be sent to an unavailable address', info=null}}, batchable=false}
2020-05-10 20:35:36,523 FINEST [io.ver.pro.imp.ProtonTransport] (vert.x-eventloop-thread-1) New Proton Event: LINK_FLOW
2020-05-10 20:35:36,523 FINEST [io.ver.pro.imp.ProtonTransport] (vert.x-eventloop-thread-1) New Proton Event: DELIVERY
2020-05-10 20:35:36,524 ERROR [io.sma.rea.mes.amq.AmqpConnector] (vert.x-eventloop-thread-1) Unable to send the AMQP message: java.util.concurrent.CompletionException: io.vertx.core.impl.NoStackTraceThrowable: message rejected (REJECTED
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
        at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1137)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2159)
        at io.vertx.axle.AsyncResultCompletionStage.lambda$toCompletionStage$0(AsyncResultCompletionStage.java:20)
        at io.vertx.amqp.impl.AmqpSenderImpl.lambda$doSend$5(AmqpSenderImpl.java:157)
        at io.vertx.proton.impl.ProtonDeliveryImpl.fireUpdate(ProtonDeliveryImpl.java:158)
        at io.vertx.proton.impl.ProtonTransport.handleSocketBuffer(ProtonTransport.java:160)
        at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:386)
        at io.vertx.core.net.impl.NetSocketImpl.lambda$new$2(NetSocketImpl.java:101)
        at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
        at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
        at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:364)
        at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:369)
        at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
        at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:232)
        at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:173)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:830)

正如你们所看到的,有一条消息被拒绝并且没有更多的输出,说明为什么会发生这种情况。在此之前,我还可以检测到:description='Deliveries cannot be sent to an unavailable address

知道为什么会这样。在此之前,我们有一个具有相同主题的 JMS 实现,并且运行良好

4

1 回答 1

2

您的 AMQP 代理可能不会“自动创建”地址,因此会拒绝消息。您是否尝试预先配置您的代理来创建这些地址及其类型(单播/多播)?

于 2020-05-11T08:45:17.510 回答