0

我在 ActiveMQ 中有一个队列,并希望使用 spring 集成将消息从其中删除到我们的应用程序中。我们已经部署了两次应用程序(以防其中一个失败)——每条消息应该只由一个应用程序处理。此外,我需要一个 jms tx 管理器,以防在消息处理期间发生致命的应用程序故障。因此,我的通道适配器如下所示:

<int-jms:message-driven-channel-adapter 
    channel="myChannel" 
    connection-factory="jmsConnectionFactory"
    pub-sub-domain="false"
    destination-name="MY_QUEUE" 
    transaction-manager="jmsTxManager" />

当是直接通道时,这一切都很好myChannel,但是我想使用任务执行器,以便可以一次处理许多消息。

为了迎合致命的应用程序故障,我认为集合通道可能是要走的路(我相信当任务执行器中的线程空闲时,通道适配器将转到活动 mq 以检索另一条消息),因为不会t 是通道上内存中保存的任何消息。这似乎不起作用,并且以下代码抛出TaskRejectException

<int:channel id="myChannel">
    <int:rendezvous-queue />
</int:channel>
<task:executor id="taskExecutor" pool-size="2" queue-capacity="0" />
<int:router input-channel="myChannel" expression="payload.getType() + 'Channel'">
    <int:poller fixed-rate="1000" task-executor="taskExecutor" />
</int:router>

位于路由器之后的服务激活器,同步处理,需要 10 秒来处理,所以我希望在 t=0s 时消失并检索 2 条消息(线程池的大小),处理它们,将线程释放到t=10s 的线程池,然后再次向活动 mq 请求消息。然而,似乎在 t=0s 检索到超过 2 条消息。

谁能建议我应该做什么?

4

1 回答 1

4

为了使交易生效,您必须使用直接渠道。

message-driven-channel-adapter您可以使用(concurrent-consumersmax-concurrent-consumers)上的并发属性来控制并发线程的数量。

于 2013-08-08T17:42:33.757 回答