我们在使用单线程路由时遇到问题,从 ActiveMQ 队列中出列消息,以下是路由配置:
<routeContext id="pms2dm-display-routes" xmlns="http://camel.apache.org/schema/spring">
<!-- Route used to send messages to displays -->
<camel:route id="pms2dm-diplay_entryP1">
<camel:from uri="jms:queue:displayqueue_POLMONE1?concurrentConsumers=1&acknowledgementModeName=CLIENT_ACKNOWLEDGE&cacheLevelName=CACHE_NONE&maxConcurrentConsumers=1"/>
<camel:threads executorServiceRef="displayP1ThreadPoolProfile">
<camel:to uri="direct:toDisplay"/>
</camel:threads>
</camel:route>
而这是 routeContext 上的线程池配置:
<camel:threadPoolProfile id="displayP1ThreadPoolProfile"
defaultProfile="false" poolSize="1" maxPoolSize="1" keepAliveTime="60"
maxQueueSize="1000" rejectedPolicy="CallerRuns"/>
在将消息出列后,它会经过一些处理,然后我们必须通过套接字(netty4)将其发送到外部系统(我们称之为 ES1),以下是 netty4 参数配置:
<camel:route id="pms2dm-display-common" >
<camel:from uri="direct:toDisplay" />
<camel:log message="--- PMS2DM Route - pms2dm-display-common: messagge scodato da displayqueue_${header.place}, JMSMESSAGEID: ${header.JMSMessageID} Body --> ${body}"/>
<camel:onException>
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<setHeader headerName="exceptionMessage">
<simple>${exception.message}</simple>
</setHeader>
<camel:log message="--- PMS2DM Route - pms2dm-display-common: Exception verificatasi in fase di invio del JMSMESSAGEID: ${header.JMSMessageID} Body --> ${body}; ExceptionMessage --> ${header.exceptionMessage}" />
<camel:delay>
<constant>1000</constant>
</camel:delay>
<camel:rollback markRollbackOnly="true"/>
</camel:onException>
<setProperty propertyName="nettyParams">
<simple><![CDATA[?disconnect=false&sync=false&synchronous=false&allowDefaultCodec=false&encoder=#byteArrayEncoder&decoder=#byteArrayDecoder&reuseChannel=true]]></simple>
</setProperty>
<camel:to uri="bean:displayMessageProcessor"/>
<!-- Send command for the start buzzer -->
从参数中可以看出,我们将reuseChannel 设置为true,因为对于我们从AMQ 出列的每条消息,我们需要在将实际消息发送到ES1 之前和之后发送一个start_buzzer 和一个stop_buzzer。所以要明确一点:消息出列,解析,然后将 start_buzzer 发送到 ES1,然后消息出去,最后发送 stop_buzzer。来自 AMQ 的一条消息 = 与 ES1 的套接字上的三个通信。套接字上的所有请求都使用接收者列表组件发送:
<camel:recipientList><exchangeProperty>displayConnection</exchangeProperty>
</camel:recipientList>
(displayConnection 是我们在同一路由上的前一个处理器中设置的属性,它包含整个 netty4 uri)
在测试环境中,我们实际上没有测试 ES1,因此我们使用在另一个 linus 服务器上打开的 NCAT 来模拟它,以便重现实际的套接字侦听器。以下场景给使用带来了一些问题: 从 AMQ 出列 1 条消息后,它开始通过路由;路由处理后发送start_buzzer给NCAT,NCAT接收;此时,在我们看到 NCAT 上接收到的实际消息之前,我们将其关闭 (CTRL+C);从这里开始,我们在 Apache Camel 日志(在 TRACE 级别)上看到路由继续进一步处理 Exchange:它显然将消息发送出去(即使 NCAT 已关闭),然后在发送 stop_buzzer 之前,路由本身似乎走了上锁。
从此时起,不再有消息从 AMQ 出列。我们在日志中看到的是:
[Camel Thread #18 - NettyClientTCPWorker] o.a.c.c.n.NettyProducer [NettyProducer.java:302] Operation complete DefaultChannelPromise@1f6f5ec3(failure: java.io.IOException: Broken pipe)
[Camel Thread #18 - NettyClientTCPWorker] o.a.c.c.n.NettyProducer [ClientChannelHandler.java:102] Channel closed: [id: 0xb941777d, L:/192.168.181.177:42584 ! R:/192.168.181.178:1471]
(在 R 上是 ES1,在 L 上当然是我们的 Apache Camel)
似乎将 reuseChannel 设置为 true 会破坏通道(当 ncat 不再可达时),尽管 Camel 本身不会重新创建通道。这似乎锁定了我们在路由上允许的唯一 JMS 消费者。这里有什么方法可以强制解锁 JMS 消费者吗?为什么会这样?如果通道坏了,为什么 IOException 没有被我们的 onException 组件捕获?
信息:单线程池和 maxConsumer=1 是强制配置;骆驼版本是2.17。