我正在使用死信通道 EIP,完全按照死信通道的骆驼文档中的描述。这是我的 camel.xml(已删除标题)
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<!-- this is the Dead Letter Channel error handler, where we send failed message to a log endpoint -->
<route errorHandlerRef="myDeadLetterErrorHandler">
<from uri="jms:q1" />
<doTry>
<validate><simple>${body[subsc]} regex '[a-z0-9]{8}' </simple> </validate>
<choice>
<when>
<simple>${body[key1]} regex '^(v1)$'</simple>
<choice>
<when>
<simple>${body[key2]} == 'v2'</simple>
<to uri="jms:getInfo" />
</when>
</choice>
</when>
<otherwise>
<to uri="jms:invalid" />
<stop />
</otherwise>
</choice>
<doCatch>
<exception>org.apache.camel.ValidationException</exception>
<to uri="jms:invalid"/>
</doCatch>
</doTry>
</route>
<route>
<from uri="jms:queue:deadlc" />
<log message="Got ${body}" loggingLevel="ERROR" logName="cool" />
</route>
</camelContext>
<!--
Lets configure some Camel endpoints
http://camel.apache.org/components.html
-->
<!-- configure the camel activemq component to use the current broker -->
<bean id="confact" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://amq-broker?create=false"/>
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${activemq.password}"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
<property name="connectionFactory" ref="confact" />
</bean>
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start"
destroy-method="stop">
<property name="maxConnections" value="8" />
<property name="connectionFactory" ref="confact" />
<property name="expiryTimeout" value="-1" />
<property name="idleTimeout" value="-1" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="concurrentConsumers" value="8"/>
</bean>
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>
<bean id="myDeadLetterErrorHandler" class="org.apache.camel.builder.DeadLetterChannelBuilder">
<property name="deadLetterUri" value="jms:queue:deadlc"/>
<property name="redeliveryPolicy" ref="myRedeliveryPolicyConfig"/>
</bean>
<bean id="myRedeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy">
<property name="maximumRedeliveries" value="3"/>
<property name="redeliveryDelay" value="5000"/>
</bean>
我只有一个具有基于内容的路由器的路由,它基本上说如果消息体有 getInfo 然后从 jms:foo 路由到 jms:getInfo,如果消息体有 performAction 然后从 jms.foo 路由到 jms:performAction
我希望当 jms:getInfo 消费者未运行时,交付将失败,重新交付尝试将如 spring XML 中所指定。但是什么也没有发生,也没有任何东西进入死信队列,而是 activemq 抛出了如下所示的异常。
有人可以解释为什么它不起作用吗?我的理解是,当消费者(将从基于内容的路由器接收消息的目的地)没有运行时,acivemq 知道它甚至不应该传递它并且应该立即抛出一个异常,然后由骆驼的死信通道处理按照配置。
我正在使用骆驼 2.10.3 和 activemq 5.8.0。
我怀疑 activemq 正在处理失败的消息传递,而忽略了死信队列的骆驼 xml 配置,因为这些失败的消息显示在 activemq Web 控制台的 Activemq.DLQ 中。如果消费者正在运行,则消息根据 camel.xml 中指定的路由成功传递。
0:58:46,317 | 调试 | 发送 JMS 消息到:queue://getInfo 消息:ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0 ,时间戳 = 0,到达 = 0,brokerInTime = 0,brokerOutTime = 0,correlationId = a79a0f9e-c88f-4501-a56d-2d5667aa98e0,replyTo = temp-queue://ID:myhost-57639-1382233787365-3:3:6 ,persistent = false,type = null,priority = 4,groupID = null,groupSequence = 0,targetConsumerId = null,compressed = false,userID = null,content = null,marshalledProperties = null,dataStructure = null,redeliveryCounter = 0,size = 0, 属性 = {breadcrumbId=ID:myhost-57439-1382232345453-1:1:1:1:6, CamelJmsDeliveryMode=1}, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {cluster=x78} } | org.apache.camel.component.jms.JmsConfiguration | Camel(骆驼)线程 #2 - JmsConsumer[bar] 20:58:48,044 | 调试 | 驱逐非活动条目 ID:8b0d77a3-afef-4612-a49f-22c1b81eb80e(20000 毫秒后超时)| org.apache.camel.component.jms.reply.CorrelationTimeoutMap | 骆驼(骆驼)线程 #9 - JmsReplyManagerTimeoutChecker[getInfo] 20:58:48,044 | 警告 | 等待具有相关 ID [8b0d77a3-afef-4612-a49f-22c1b81eb80e] 的回复消息 20000 毫秒后发生超时。设置 ExchangeTimedOutException on (MessageId: ID:myhost-57439-1382232345453-1:1:1:1:5 on ExchangeId: ID-myhost-57640-1382233788284-0-2) 并继续路由。| org.apache.camel.component.jms.reply.TemporaryQueueReplyManager | Camel(骆驼)线程 #9 - JmsReplyManagerTimeoutChecker[getInfo] 20:58:48,045 | 警告 | JMS 消息侦听器的执行失败。原因:[org.apache.camel.RuntimeCamelException - org.apache.camel.ExchangeTimedOutException:未收到 OUT 消息:20000 毫秒到期回复消息,相关 ID:8b0d77a3-afef-4612-a49f-22c1b81eb80e 未收到。交换[JmsMessage[JmsMessageID: ID:myhost-57439-1382232345453-1:1:1:1:5]]] | org.apache.camel.component.jms.EndpointMessageListener | 骆驼(骆驼)线程#4-JmsConsumer [bar] org.apache.camel.RuntimeCamelException:org.apache.camel.ExchangeTimedOutException:未收到OUT消息:20000毫秒到期回复消息,相关ID:8b0d77a3-afef-4612-未收到 a49f-22c1b81eb80e。Exchange[JmsMessage[JmsMessageID: ID:myhost-57439-1382232345453-1:1:1:1:5]] 在 org.apache.camel.util.ObjectHelper。ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) 原因:org.apache.camel.ExchangeTimedOutException:未收到 OUT 消息:20000 毫秒到期回复未收到相关 ID 为 8b0d77a3-afef-4612-a49f-22c1b81eb80e 的消息。Exchange[JmsMessage[JmsMessageID: ID:myhost-57439-1382232345453-1:1:1:1:5]] 在 org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:133) 在 org .apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:61) 在 org.apache.camel.component.jms.reply.CorrelationTimeoutMap.onEviction(CorrelationTimeoutMap.java:53) 在 org.apache.camel .component.jms.reply.CorrelationTimeoutMap.onEviction(CorrelationTimeoutMap.java: