0

我们在使用单线程路由时遇到问题,从 ActiveMQ 队列中出列消息,以下是路由配置:

<routeContext id="pms2dm-display-routes" xmlns="http://camel.apache.org/schema/spring">
        <!-- Route used to send messages to displays -->
        
        <camel:route id="pms2dm-diplay_entryP1">
            <camel:from uri="jms:queue:displayqueue_POLMONE1?concurrentConsumers=1&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;cacheLevelName=CACHE_NONE&amp;maxConcurrentConsumers=1"/>
                <camel:threads executorServiceRef="displayP1ThreadPoolProfile">
                    <camel:to uri="direct:toDisplay"/>
                </camel:threads>
        </camel:route>

而这是 routeContext 上的线程池配置:

<camel:threadPoolProfile id="displayP1ThreadPoolProfile"
                                 defaultProfile="false" poolSize="1" maxPoolSize="1" keepAliveTime="60"
                                 maxQueueSize="1000" rejectedPolicy="CallerRuns"/>

在将消息出列后,它会经过一些处理,然后我们必须通过套接字(netty4)将其发送到外部系统(我们称之为 ES1),以下是 netty4 参数配置:

    <camel:route id="pms2dm-display-common"  >
        <camel:from uri="direct:toDisplay" />
        <camel:log message="--- PMS2DM Route - pms2dm-display-common: messagge scodato da displayqueue_${header.place}, JMSMESSAGEID: ${header.JMSMessageID} Body --> ${body}"/>
            <camel:onException>
                <exception>java.lang.Exception</exception>
                <handled>
                    <constant>true</constant>
                </handled>
                <setHeader headerName="exceptionMessage">
                    <simple>${exception.message}</simple>
                </setHeader>
                <camel:log message="--- PMS2DM Route - pms2dm-display-common: Exception verificatasi in fase di invio del JMSMESSAGEID: ${header.JMSMessageID} Body --> ${body}; ExceptionMessage --> ${header.exceptionMessage}" />
                 <camel:delay>
                        <constant>1000</constant>
                 </camel:delay>
                <camel:rollback markRollbackOnly="true"/>
            </camel:onException>

            <setProperty propertyName="nettyParams">
                <simple><![CDATA[?disconnect=false&sync=false&synchronous=false&allowDefaultCodec=false&encoder=#byteArrayEncoder&decoder=#byteArrayDecoder&reuseChannel=true]]></simple>
            </setProperty>
            <camel:to uri="bean:displayMessageProcessor"/>
            
            
            <!-- Send command for the start buzzer -->

从参数中可以看出,我们将reuseChannel 设置为true,因为对于我们从AMQ 出列的每条消息,我们需要在将实际消息发送到ES1 之前和之后发送一个start_buzzer 和一个stop_buzzer。所以要明确一点:消息出列,解析,然后将 start_buzzer 发送到 ES1,然后消息出去,最后发送 stop_buzzer。来自 AMQ 的一条消息 = 与 ES1 的套接字上的三个通信。套接字上的所有请求都使用接收者列表组件发送:

<camel:recipientList><exchangeProperty>displayConnection</exchangeProperty>
                        </camel:recipientList>

(displayConnection 是我们在同一路由上的前一个处理器中设置的属性,它包含整个 netty4 uri)

在测试环境中,我们实际上没有测试 ES1,因此我们使用在另一个 linus 服务器上打开的 NCAT 来模拟它,以便重现实际的套接字侦听器。以下场景给使用带来了一些问题: 从 AMQ 出列 1 条消息后,它开始通过路由;路由处理后发送start_buzzer给NCAT,NCAT接收;此时,在我们看到 NCAT 上接收到的实际消息之前,我们将其关闭 (CTRL+C);从这里开始,我们在 Apache Camel 日志(在 TRACE 级别)上看到路由继续进一步处理 Exchange:它显然将消息发送出去(即使 NCAT 已关闭),然后在发送 stop_buzzer 之前,路由本身似乎走了上锁。

从此时起,不再有消息从 AMQ 出列。我们在日志中看到的是:

[Camel Thread #18 - NettyClientTCPWorker] o.a.c.c.n.NettyProducer [NettyProducer.java:302] Operation complete DefaultChannelPromise@1f6f5ec3(failure: java.io.IOException: Broken pipe)
[Camel Thread #18 - NettyClientTCPWorker] o.a.c.c.n.NettyProducer [ClientChannelHandler.java:102] Channel closed: [id: 0xb941777d, L:/192.168.181.177:42584 ! R:/192.168.181.178:1471]

(在 R 上是 ES1,在 L 上当然是我们的 Apache Camel)

似乎将 reuseChannel 设置为 true 会破坏通道(当 ncat 不再可达时),尽管 Camel 本身不会重新创建通道。这似乎锁定了我们在路由上允许的唯一 JMS 消费者。这里有什么方法可以强制解锁 JMS 消费者吗?为什么会这样?如果通道坏了,为什么 IOException 没有被我们的 onException 组件捕获?

信息:单线程池和 maxConsumer=1 是强制配置;骆驼版本是2.17。

4

0 回答 0