Artemis 消息生产者 java 程序:
initialContext = new InitialContext();
// Step 2. Look-up the JMS topic
Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");
// Step 3. Look-up the JMS connection factory
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
// Step 4. Create a JMS connection
connection = cf.createConnection();
// Step 5. Set the client-id on the connection
connection.setClientID("durable-client");
// Step 6. Start the connection
connection.start();
// Step 7. Create a JMS session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 8. Create a JMS message producer
MessageProducer messageProducer = session.createProducer(topic);
TextMessage message1 = session.createTextMessage("This is a text message 1");
// Step 11. Send the text message to the topic
messageProducer.send(message1);
......
客户端(使用弹簧)->
@Bean("amqTransportConfiguration")
public TransportConfiguration amqTransportConfiguration() {
return new TransportConfiguration(NettyConnectorFactory.class.getName(), getParams("61616"));
}
@Bean("connectionFactory")
public ConnectionFactory activeMQJMSConnectionFactory(@Qualifier("amqTransportConfiguration") TransportConfiguration transportConfiguration) throws JMSException {
ActiveMQJMSConnectionFactory activeMQJMSConnectionFactory =
new ActiveMQJMSConnectionFactory( false, transportConfiguration);
activeMQJMSConnectionFactory.setPassword("xxxxx");
activeMQJMSConnectionFactory.setUser("xxxx");
activeMQJMSConnectionFactory.setClientID(clientServer0Id);
return activeMQJMSConnectionFactory;
}
@Bean
public MessageListenerContainer listenerContainer1(@Qualifier("connectionFactory") ConnectionFactory connectionFactory, Consumer consumer, ObjectMessageConverter messageConverter, @Qualifier("topic") Topic topic) {
return new DefaultMessageListenerContainerBuilder().connectionFactory(connectionFactory)
.destination(topic)
.messageConverter(messageConverter)
.messageListener(consumer)
.sessionAcknowledgeMode(1)
.pubSubDomain(true)
.subscriptionDurable(true)
.clientId(clientServer0Id)
.durableSubscriptionName(subscriptionName)
.build();
}
对象消息转换器->
@Component
public class ObjectMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
return session.createObjectMessage((Serializable) object);
}
@Override
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
if (message instanceof ActiveMQTextMessage) {
ActiveMQTextMessage objMessage = (ActiveMQTextMessage) message;
return ((ActiveMQTextMessage) message).getText();
}
TextMessage textMessage = (TextMessage) message;
return textMessage.getText();
}
}
当我运行生产者程序时,向一个主题发送一条消息,其中客户端(弹簧部分)正在监听这个主题,但它无法成功接收消息。
在日志中获取以下跟踪(我也尝试使用简单消息转换器,但仍然面临同样的问题),如果您将检查下面的日志跟踪 ActiveMQMessageConsumer 正在抛出此异常。
[nerContainer1-1] org.apache.activemq.artemis.core.client : readerIndex(22) + length(20) exceeds writerIndex(38): UnpooledDuplicatedByteBuf(ridx: 22, widx: 38, cap: 221, unwrapped: UnpooledUnsafeHeapByteBuf(ridx: 209, widx: 221, cap: 221))@ClientMessageImpl[messageID=3713, durable=true, address=jms.topic.exampleTopic,userID=c0b2a331-de72-11e7-ab14-3e1461f90711,properties=TypedProperties[__AMQ_CID=4,_AMQ_ROUTING_TYPE=0]]
java.lang.IndexOutOfBoundsException: readerIndex(22) + length(20) exceeds writerIndex(38): UnpooledDuplicatedByteBuf(ridx: 22, widx: 38, cap: 221, unwrapped: UnpooledUnsafeHeapByteBuf(ridx: 209, widx: 221, cap: 221))@ClientMessageImpl[messageID=3713, durable=true, address=jms.topic.exampleTopic,userID=c0b2a331-de72-11e7-ab14-3e1461f90711,properties=TypedProperties[__AMQ_CID=4,_AMQ_ROUTING_TYPE=0]]
at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.getMessage(ActiveMQMessageConsumer.java:230) ~[artemis-jms-client-1.5.5.jar:1.5.5]
at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:132) ~[artemis-jms-client-1.5.5.jar:1.5.5]
at org.springframework.jms.support.destination.JmsDestinationAccessor.receiveFromConsumer(JmsDestinationAccessor.java:130) ~[spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:416) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:302) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(20) exceeds writerIndex(38): UnpooledDuplicatedByteBuf(ridx: 22, widx: 38, cap: 221, unwrapped: UnpooledUnsafeHeapByteBuf(ridx: 209, widx: 221, cap: 221))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1395) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1389) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:850) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:858) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
at io.netty.buffer.WrappedByteBuf.readBytes(WrappedByteBuf.java:649) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readSimpleStringInternal(ChannelBufferWrapper.java:93) ~[artemis-commons-1.5.5.jar:1.5.5]
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readNullableSimpleString(ChannelBufferWrapper.java:73) ~[artemis-commons-1.5.5.jar:1.5.5]
at org.apache.activemq.artemis.reader.TextMessageUtil.readBodyText(TextMessageUtil.java:37) ~[artemis-core-client-1.5.5.jar:1.5.5]
at org.apache.activemq.artemis.jms.client.ActiveMQTextMessage.doBeforeReceive(ActiveMQTextMessage.java:112) ~[artemis-jms-client-1.5.5.jar:1.5.5]
at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.getMessage(ActiveMQMessageConsumer.java:224) ~[artemis-jms-client-1.5.5.jar:1.5.5]
... 9 common frames omitted