通过 JMS 连接到 ActiveMQ Artemis 时,我在基于 Apache Camel 的应用程序中遇到问题。在 Camel 路由之一的末尾,消息存储在 Artemis JMS 队列中。在同一应用程序中运行的遗留组件会定期使用ConsumerTemplate
.
这适用于带有纯文本正文的 Camel 消息,但在使用字节数组正文时会导致错误:似乎 Artemis 将任何带有字节正文的消息视为“大消息”,这些消息是流式传输的,而不是保存在内存中。通过ConsumerTemplate
作品接收,但一旦访问正文或标题,就会引发如下异常:
org.apache.camel.RuntimeCamelException: Failed to extract body due to: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session. Message: ActiveMQMessage[ID:90c4d1d5-3233-11ea-b0cc-44032c68a56f]:PERSISTENT/ClientLargeMessageImpl[messageID=2974, durable=true, address=mytest,userID=90c4d1d5-3233-11ea-b0cc-44032c68a56f,properties=TypedProperties[firedTime=Wed Jan 08 17:26:03 CET 2020,__AMQ_CID=90b4f34e-3233-11ea-b0cc-44032c68a56f,breadcrumbId=ID-NB045-evolit-co-at-1578500762151-0-1,_AMQ_ROUTING_TYPE=1,_AMQ_LARGE_SIZE=3]]
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:172) ~[camel-jms-2.22.1.jar:2.22.1]
at org.apache.camel.component.jms.JmsMessage.createBody(JmsMessage.java:221) ~[camel-jms-2.22.1.jar:2.22.1]
at org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:54) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.example.cdi.JmsPoller.someMethod(JmsPoller.java:36) ~[classes/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_171]
at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:188) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) [camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197) [camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79) [camel-core-2.22.1.jar:2.22.1]
at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_171]
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_171]
Caused by: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
... 21 more
Caused by: org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
... 21 more
对于不超过minLargeMessageSize
Artemis 的消息,在测试程序中甚至 3 个字节也会出现此问题。
巧合的是,在用于测试应用程序的独立应用程序中也出现了同样的问题。在那里,我能够通过保持 JMS 会话和接收器打开直到完全读取 JMS 消息正文和标头来解决问题。JmsTemplate
使用 Camel,它在Camel 所基于的 Spring 中被抽象出来。
我查阅了Camel JMS 组件的用户文档,以找到可能对我有帮助的配置选项。我尝试了以下方法:
eagerLoadingOfProperties=true
消费者方面:没有影响,只是似乎有影响MessageListenerContainer
。文档说:
它使用 [...] Spring 的 JmsTemplate 进行发送,使用 MessageListenerContainer 进行消费。
但是,在调试时,它似乎MessageListenerContainer
仅在使用 Camel 路由中来自 JMS 端点的消息时使用。在我的情况下使用ConsumerTemplate
likeJmsTemplate
用于消费。
messageConverter
在消费者方面mapJmsMessage
:没有效果,它们在会话已经关闭时执行alwaysCopyMessage
在生产者方面:我认为复制可能会阻止使用流式大消息,没有效果streamMessageTypeEnabled=false
在生产者方面:没有影响jmsMessageType=Bytes
在生产者和消费者方面:没有影响transferExchange=true
在生产者和消费者方面:这似乎确实解决了我的具体情况,但感觉像是一种解决方法。文档建议谨慎使用该选项。
所以现在,这transferExchange
似乎是我最好的选择,假设它真的解决了我在所有测试用例中的问题。不过,我很高兴能更好地理解这个问题或不同的解决方案:
- 为什么 Artemis 将小字节数组消息视为大消息?
- Camel ConsumerTemplate 是否完全支持流式大消息?
我的版本是 Camel 2.22.1 和 Artemis 2.10.1。
我已经能够通过修改 Camelcamel-example-cdi
发布包中的 Camel 示例来重现我的问题,使其具有如下所示的最小类。此外,我添加了camel-jms
Artemis 依赖项并在本地启动了 Artemis,两者都如camel-example-artemis-large-messages
示例中所述。
public class MyRoutes extends RouteBuilder {
@Override
public void configure() {
setupJmsComponent();
from("timer:writeTimer?period=6000")
.log("writing to JMS")
.setBody(() -> new byte[]{0,1,2})
.to(JmsPoller.ENDPOINT);
from("timer:pollTimer?period=3000")
.to("bean:jmsPoller");
}
private void setupJmsComponent() {
ActiveMQJMSConnectionFactory connectionFactory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
JmsComponent jmsComponent = new JmsComponent();
jmsComponent.setConnectionFactory(connectionFactory);
getContext().addComponent("jms", jmsComponent);
}
}
@Singleton
@Named("jmsPoller")
public class JmsPoller {
static final String ENDPOINT = "jms:queue:mytest";
@Inject
private ConsumerTemplate consumerTemplate;
public void someMethod(String body) {
Exchange exchange = consumerTemplate.receive(ENDPOINT, 1000L);
System.out.println("Received " + (exchange == null ? null : exchange.getIn().getBody()));
}
}