我们使用 qpid-jms-client-0.57.0 从 Azure ServiceBus 发布和接收消息。ServiceBus 提供了从会话接收消息以维护消息顺序的功能。请参阅此处了解更多详细信息 - https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions
我可以使用 JMXGroupId 发布消息,但无法从启用会话的队列中接收消息。出现错误 - javax.jms.JMSException:需要会话的实体无法创建非会话消息接收器。TrackingId:*, SystemTracker:mule-intr-sbus-test-standard:Queue:test-order, Timestamp:2021-07-28T11:07:49 TrackingId:**, SystemTracker:gateway7, Timestamp:2021-07-28T11: 07:49 [条件 = amqp:不允许]
您能否建议从启用会话的队列中接收消息?
示例代码
public void receiveMessage() throws Exception {
System.out.println("** Receiver start **");
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(this);
System.out.println("** Receiver registered **");
}
错误堆栈跟踪
Exception in thread "main" javax.jms.JMSException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. TrackingId:****, SystemTracker:****:Queue:test-order, Timestamp:2021-07-28T10:30:03 TrackingId:****, SystemTracker:gateway7, Timestamp:2021-07-28T10:30:03 [condition = amqp:not-allowed]
at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698)
at org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:125)
at org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:82)
at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:479)
at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:467)
at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:459)
at com.qpid.test.TestSessionEnable.receiveMessage(TestSessionEnable.java:70)
at com.qpid.test.TestSessionEnable.main(TestSessionEnable.java:80)
Caused by: org.apache.qpid.jms.provider.ProviderException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. TrackingId:****, SystemTracker:****:Queue:test-order, Timestamp:2021-07-28T10:30:03 TrackingId:****, SystemTracker:gateway7, Timestamp:2021-07-28T10:30:03 [condition = amqp:not-allowed]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181)
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:299)
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:185)
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:129)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:985)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:871)
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)