0

Artemis 消息生产者 java 程序:

initialContext = new InitialContext();

         // Step 2. Look-up the JMS topic
         Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");

         // Step 3. Look-up the JMS connection factory
         ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");

         // Step 4. Create a JMS connection
         connection = cf.createConnection();

         // Step 5. Set the client-id on the connection
         connection.setClientID("durable-client");

         // Step 6. Start the connection
         connection.start();

         // Step 7. Create a JMS session
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

         // Step 8. Create a JMS message producer
         MessageProducer messageProducer = session.createProducer(topic);
         TextMessage message1 = session.createTextMessage("This is a text message 1");
         // Step 11. Send the text message to the topic
         messageProducer.send(message1);

......

客户端(使用弹簧)->

 @Bean("amqTransportConfiguration")
        public TransportConfiguration amqTransportConfiguration() {
            return new TransportConfiguration(NettyConnectorFactory.class.getName(), getParams("61616"));
        }


        @Bean("connectionFactory")
        public ConnectionFactory activeMQJMSConnectionFactory(@Qualifier("amqTransportConfiguration") TransportConfiguration transportConfiguration) throws JMSException {
            ActiveMQJMSConnectionFactory activeMQJMSConnectionFactory =
                    new ActiveMQJMSConnectionFactory( false, transportConfiguration);
            activeMQJMSConnectionFactory.setPassword("xxxxx");
            activeMQJMSConnectionFactory.setUser("xxxx");
            activeMQJMSConnectionFactory.setClientID(clientServer0Id);
            return activeMQJMSConnectionFactory;
        }

        @Bean
        public MessageListenerContainer listenerContainer1(@Qualifier("connectionFactory") ConnectionFactory connectionFactory, Consumer consumer, ObjectMessageConverter messageConverter, @Qualifier("topic") Topic topic) {
            return new DefaultMessageListenerContainerBuilder().connectionFactory(connectionFactory)
                    .destination(topic)
                    .messageConverter(messageConverter)
                    .messageListener(consumer)
                    .sessionAcknowledgeMode(1)
                    .pubSubDomain(true)
                    .subscriptionDurable(true)
                    .clientId(clientServer0Id)
                    .durableSubscriptionName(subscriptionName)
                    .build();
        }

对象消息转换器->

@Component
public class ObjectMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
        return session.createObjectMessage((Serializable) object);
    }

    @Override
    public Object fromMessage(Message message) throws JMSException, MessageConversionException {
        if (message instanceof ActiveMQTextMessage) {
            ActiveMQTextMessage objMessage = (ActiveMQTextMessage) message;
            return ((ActiveMQTextMessage) message).getText();
        }
        TextMessage textMessage = (TextMessage) message;
        return textMessage.getText();
    }

}

当我运行生产者程序时,向一个主题发送一条消息,其中客户端(弹簧部分)正在监听这个主题,但它无法成功接收消息。

在日志中获取以下跟踪(我也尝试使用简单消息转换器,但仍然面临同样的问题),如果您将检查下面的日志跟踪 ActiveMQMessageConsumer 正在抛出此异常。

[nerContainer1-1] org.apache.activemq.artemis.core.client  : readerIndex(22) + length(20) exceeds writerIndex(38): UnpooledDuplicatedByteBuf(ridx: 22, widx: 38, cap: 221, unwrapped: UnpooledUnsafeHeapByteBuf(ridx: 209, widx: 221, cap: 221))@ClientMessageImpl[messageID=3713, durable=true, address=jms.topic.exampleTopic,userID=c0b2a331-de72-11e7-ab14-3e1461f90711,properties=TypedProperties[__AMQ_CID=4,_AMQ_ROUTING_TYPE=0]]

        java.lang.IndexOutOfBoundsException: readerIndex(22) + length(20) exceeds writerIndex(38): UnpooledDuplicatedByteBuf(ridx: 22, widx: 38, cap: 221, unwrapped: UnpooledUnsafeHeapByteBuf(ridx: 209, widx: 221, cap: 221))@ClientMessageImpl[messageID=3713, durable=true, address=jms.topic.exampleTopic,userID=c0b2a331-de72-11e7-ab14-3e1461f90711,properties=TypedProperties[__AMQ_CID=4,_AMQ_ROUTING_TYPE=0]]
            at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.getMessage(ActiveMQMessageConsumer.java:230) ~[artemis-jms-client-1.5.5.jar:1.5.5]
            at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:132) ~[artemis-jms-client-1.5.5.jar:1.5.5]
            at org.springframework.jms.support.destination.JmsDestinationAccessor.receiveFromConsumer(JmsDestinationAccessor.java:130) ~[spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
            at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:416) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
            at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:302) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
            at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
            at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
            at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
            at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055) [spring-jms-4.3.12.RELEASE.jar:4.3.12.RELEASE]
            at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
        Caused by: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(20) exceeds writerIndex(38): UnpooledDuplicatedByteBuf(ridx: 22, widx: 38, cap: 221, unwrapped: UnpooledUnsafeHeapByteBuf(ridx: 209, widx: 221, cap: 221))
            at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1395) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
            at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1389) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
            at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:850) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
            at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:858) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
            at io.netty.buffer.WrappedByteBuf.readBytes(WrappedByteBuf.java:649) ~[netty-all-4.1.5.Final.jar:4.1.5.Final]
            at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readSimpleStringInternal(ChannelBufferWrapper.java:93) ~[artemis-commons-1.5.5.jar:1.5.5]
            at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readNullableSimpleString(ChannelBufferWrapper.java:73) ~[artemis-commons-1.5.5.jar:1.5.5]
            at org.apache.activemq.artemis.reader.TextMessageUtil.readBodyText(TextMessageUtil.java:37) ~[artemis-core-client-1.5.5.jar:1.5.5]
            at org.apache.activemq.artemis.jms.client.ActiveMQTextMessage.doBeforeReceive(ActiveMQTextMessage.java:112) ~[artemis-jms-client-1.5.5.jar:1.5.5]
            at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.getMessage(ActiveMQMessageConsumer.java:224) ~[artemis-jms-client-1.5.5.jar:1.5.5]
            ... 9 common frames omitted
4

0 回答 0