我在 Quarkus 上有一个应用程序正在接收 AmqpMessages,并将它们发送到另一个主题。
我不断收到来自 smallrye 的错误消息,说消息被拒绝。
这里是属性
mp.messaging.incoming.data.address=incoming
mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=false
mp.messaging.outgoing.position.address=outgoing
mp.messaging.outgoing.position.connector=smallrye-amqp
mp.messaging.outgoing.position.host=localhost
mp.messaging.outgoing.position.port=5672
mp.messaging.outgoing.position.durable=false
班级本身
@Incoming("data")
@Outgoing("position")
public CompletionStage handleMessage(final String topic, final MessagingMessage messageToProcess) {
final String message = messageToProcess.getMessageString();
final String tenant = messageToProcess.getTenant();
final String Id = messageToProcess.Id();
final Message message = _gson.fromJson(message, Message.class);
return _service.getStuff(tenant, id)
.thenApply(stuff -> calculate(message, thing))
.thenApply(Data -> buildAmqpMessage(tenant, id, message, Data))
.exceptionally(ex -> {
_logger.errorv("Error handling message: {0} ", ex);
return null;
});
}
public AmqpMessage buildAmqpMessage(final String tenant, final String id,
final Message message, final Data data) {
final OpenMessage messageToSend = buildMessage(message, openClosePercentageData);
return OutgoingAmqpMessage.builder()
.withSubject(_gson.toJson(messageToSend))
.build();
}
日志输出:
2020-05-10 20:35:36,376 DEBUG [io.sma.rea.mes.amq.AmqpConnector] (vert.x-eventloop-thread-1) Sending AMQP message to address `outgoing`
2020-05-10 20:35:36,377 FINEST [io.ver.pro.imp.ProtonTransport] (vert.x-eventloop-thread-1) New Proton Event: LINK_FLOW
2020-05-10 20:35:36,523 FINE [pro.trace] (vert.x-eventloop-thread-1) IN: CH[0] : Flow{nextIncomingId=2, incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647, handle=0, deliveryCount=1, linkCredit=250, available=null, drain=false, echo=false, properties=null}
2020-05-10 20:35:36,523 FINE [pro.trace] (vert.x-eventloop-thread-1) IN: CH[0] : Disposition{role=RECEIVER, first=0, last=null, settled=true, state=Rejected{error=Error{condition=amqp:not-found, description='Deliveries cannot be sent to an unavailable address', info=null}}, batchable=false}
2020-05-10 20:35:36,523 FINEST [io.ver.pro.imp.ProtonTransport] (vert.x-eventloop-thread-1) New Proton Event: LINK_FLOW
2020-05-10 20:35:36,523 FINEST [io.ver.pro.imp.ProtonTransport] (vert.x-eventloop-thread-1) New Proton Event: DELIVERY
2020-05-10 20:35:36,524 ERROR [io.sma.rea.mes.amq.AmqpConnector] (vert.x-eventloop-thread-1) Unable to send the AMQP message: java.util.concurrent.CompletionException: io.vertx.core.impl.NoStackTraceThrowable: message rejected (REJECTED
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1137)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2159)
at io.vertx.axle.AsyncResultCompletionStage.lambda$toCompletionStage$0(AsyncResultCompletionStage.java:20)
at io.vertx.amqp.impl.AmqpSenderImpl.lambda$doSend$5(AmqpSenderImpl.java:157)
at io.vertx.proton.impl.ProtonDeliveryImpl.fireUpdate(ProtonDeliveryImpl.java:158)
at io.vertx.proton.impl.ProtonTransport.handleSocketBuffer(ProtonTransport.java:160)
at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:386)
at io.vertx.core.net.impl.NetSocketImpl.lambda$new$2(NetSocketImpl.java:101)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:364)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:369)
at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:232)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:173)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)
正如你们所看到的,有一条消息被拒绝并且没有更多的输出,说明为什么会发生这种情况。在此之前,我还可以检测到:description='Deliveries cannot be sent to an unavailable address
知道为什么会这样。在此之前,我们有一个具有相同主题的 JMS 实现,并且运行良好