0

通过 JMS 连接到 ActiveMQ Artemis 时,我在基于 Apache Camel 的应用程序中遇到问题。在 Camel 路由之一的末尾,消息存储在 Artemis JMS 队列中。在同一应用程序中运行的遗留组件会定期使用ConsumerTemplate.

这适用于带有纯文本正文的 Camel 消息,但在使用字节数组正文时会导致错误:似乎 Artemis 将任何带有字节正文的消息视为“大消息”,这些消息是流式传输的,而不是保存在内存中。通过ConsumerTemplate作品接收,但一旦访问正文或标题,就会引发如下异常:

org.apache.camel.RuntimeCamelException: Failed to extract body due to: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session. Message: ActiveMQMessage[ID:90c4d1d5-3233-11ea-b0cc-44032c68a56f]:PERSISTENT/ClientLargeMessageImpl[messageID=2974, durable=true, address=mytest,userID=90c4d1d5-3233-11ea-b0cc-44032c68a56f,properties=TypedProperties[firedTime=Wed Jan 08 17:26:03 CET 2020,__AMQ_CID=90b4f34e-3233-11ea-b0cc-44032c68a56f,breadcrumbId=ID-NB045-evolit-co-at-1578500762151-0-1,_AMQ_ROUTING_TYPE=1,_AMQ_LARGE_SIZE=3]]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:172) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsMessage.createBody(JmsMessage.java:221) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:54) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.example.cdi.JmsPoller.someMethod(JmsPoller.java:36) ~[classes/:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_171]
        at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:188) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79) [camel-core-2.22.1.jar:2.22.1]
        at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_171]
        at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_171]
Caused by: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
        at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
        ... 21 more
Caused by: org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
        at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
        ... 21 more

对于不超过minLargeMessageSizeArtemis 的消息,在测试程序中甚至 3 个字节也会出现此问题。

巧合的是,在用于测试应用程序的独立应用程序中也出现了同样的问题。在那里,我能够通过保持 JMS 会话和接收器打开直到完全读取 JMS 消息正文和标头来解决问题。JmsTemplate使用 Camel,它在Camel 所基于的 Spring 中被抽象出来。


我查阅了Camel JMS 组件的用户文档,以找到可能对我有帮助的配置选项。我尝试了以下方法:

  • eagerLoadingOfProperties=true消费者方面:没有影响,只是似乎有影响MessageListenerContainer。文档说:

它使用 [...] Spring 的 JmsTemplate 进行发送,使用 MessageListenerContainer 进行消费。

但是,在调试时,它似乎MessageListenerContainer仅在使用 Camel 路由中来自 JMS 端点的消息时使用。在我的情况下使用ConsumerTemplatelikeJmsTemplate用于消费。

  • messageConverter在消费者方面mapJmsMessage:没有效果,它们在会话已经关闭时执行
  • alwaysCopyMessage在生产者方面:我认为复制可能会阻止使用流式大消息,没有效果
  • streamMessageTypeEnabled=false在生产者方面:没有影响
  • jmsMessageType=Bytes在生产者和消费者方面:没有影响
  • transferExchange=true在生产者和消费者方面:这似乎确实解决了我的具体情况,但感觉像是一种解决方法。文档建议谨慎使用该选项。

所以现在,这transferExchange似乎是我最好的选择,假设它真的解决了我在所有测试用例中的问题。不过,我很高兴能更好地理解这个问题或不同的解决方案:

  1. 为什么 Artemis 将小字节数组消息视为大消息?
  2. Camel ConsumerTemplate 是否完全支持流式大消息?

我的版本是 Camel 2.22.1 和 Artemis 2.10.1。


我已经能够通过修改 Camelcamel-example-cdi发布包中的 Camel 示例来重现我的问题,使其具有如下所示的最小类。此外,我添加了camel-jmsArtemis 依赖项并在本地启动了 Artemis,两者都如camel-example-artemis-large-messages示例中所述。

public class MyRoutes extends RouteBuilder {

    @Override
    public void configure() {
        setupJmsComponent();

        from("timer:writeTimer?period=6000")
                .log("writing to JMS")
                .setBody(() -> new byte[]{0,1,2})
                .to(JmsPoller.ENDPOINT);

        from("timer:pollTimer?period=3000")
            .to("bean:jmsPoller");
    }

    private void setupJmsComponent() {
        ActiveMQJMSConnectionFactory connectionFactory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConnectionFactory(connectionFactory);
        getContext().addComponent("jms", jmsComponent);
    }

}
@Singleton
@Named("jmsPoller")
public class JmsPoller {
    static final String ENDPOINT = "jms:queue:mytest";

    @Inject
    private ConsumerTemplate consumerTemplate;

    public void someMethod(String body) {
        Exchange exchange = consumerTemplate.receive(ENDPOINT, 1000L);
        System.out.println("Received " + (exchange == null ? null : exchange.getIn().getBody()));
    }

}
4

1 回答 1

0

ActiveMQ Artemis 不会将任何带有字节正文的消息视为“大”消息。值得注意的是,代理最终将所有消息体视为一个字节数组,因为它们正是如此。然而,为了被认为是“大”的消息必须超过一定的大小。该文档指出:

任何大于特定大小的消息都被视为大消息。大消息将被拆分并以片段的形式发送。这是由 URL 参数决定的minLargeMessageSize

注意

Apache ActiveMQ Artemis 消息使用每个字符 2 个字节进行编码,因此如果消息数据用 ASCII 字符(1 个字节)填充,则生成的 Apache ActiveMQ Artemis 消息的大小大约会加倍。这在计算“大”消息的大小时很重要,因为它可能看起来小于minLargeMessageSize发送之前的大小,但一旦编码,它就会变成“大”消息。

默认值为 100KiB。

看起来应用程序的用例根本不符合 ActiveMQ Artemis 中大消息支持的语义,因为消息来自的会话在消息的正文被完全接收之前被关闭。

因此,我建议您在读取正文之前保持会话打开,或者在发送minLargeMessageSize消息的应用程序的 URL 上增加,这样任何消息都不会被视为“大”。后一个选项可能会导致代理上更多的内存使用,因为整个消息体将立即保存在内存中。

于 2020-01-08T19:42:10.183 回答