0

我按照官方 Quarkus 消息传递指南创建了一个简单的示例来体验使用 AMQP(Apache Artemis)的反应式消息传递特性。

完整的代码在这里

该示例正在运行,但存在一个小问题,我必须先启动一个curl来使用消息,然后再使用另一个curl来发送消息。

// start consumer side.
curl http://localhost:8080/messages -H "Accept:text/event-stream"

// start sending.
curl http://localhost:8080/messages -d "Hello, Quarkus" -H "Content-Type:text/plain"

// then the consumer exit.
// and sending a message will cause an exception.


如果取消消费者,然后发送消息,则会抛出异常。

2020-10-12 20:18:54,137 WARN  [io.net.cha.AbstractChannelHandlerContext] (vert.x-eventloop-thread-6) Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@4bda4836(failure: java.nio.channels.ClosedChannelException), unnotified cause: java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
    at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:832)
: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
    at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
    at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:867)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
    at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:832)

即使没有连接消费者,也可以在 Apache Artemis 中缓存消息。

4

1 回答 1

0

Apache ActiveMQ Artemis 具有强大而灵活的寻址模型。如果您的场景是点对点消息传递,您可以定义自定义代理配置的地址,即:

<address name="messages">
    <anycast>
        <queue name="messages" />
    </anycast>
</address>

代理将在启动时创建配置中定义的所有地址和队列,因此当发送者发送消息时,它将被路由到现有队列。

于 2020-10-12T13:04:04.930 回答