1

我有两个 Java 进程,其中第一个生成消息并将它们放入 ActiveMQ 队列。第二个进程(消费者)使用 Spring Integration 从队列中获取消息并在线程中处理它们。

我有两个要求:

  1. 消费者应该有 3 个处理线程。如果我有 10 条消息通过队列进入,我想让 3 个线程处理前 3 条消息,而其他 7 条消息应该被缓冲。

  2. 当消费者在某些消息尚未处理时停止时,它应该在重新启动后继续处理消息。

这是我的配置:

<bean id="messageActiveMqQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="example.queue" />
</bean>

<int-jms:message-driven-channel-adapter
    destination="messageActiveMqQueue" channel="incomingMessageChannel" />

<int:channel id="incomingMessageChannel">
    <int:dispatcher task-executor="incomingMessageChannelExecutor" />
</int:channel>

<bean id="incomingMessageChannelExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="daemon" value="false" />
    <property name="maxPoolSize" value="3" />
</bean>

<int:service-activator input-channel="incomingMessageChannel"
    ref="myMessageProcessor" method="processMessage" />

第一个要求按预期工作。我产生 10 条消息,并且 3 个 myMessageProcessors 开始处理每条消息。一旦第一条消息完成,就处理第四条消息。

但是,当我在处理所有消息之前杀死消费者时,这些消息就会丢失。重新启动后,消费者不会再次收到这些消息。

我认为在上述配置中,这是因为 ThreadPoolTask​​Executor 生成的线程将消息排队。所以消息已经从incomingMessageChannel 中删除。因此我尝试设置incomingMessageChannelExecutor的队列容量:

<property name="queueCapacity" value="0" />

但是现在当我收到超过 3 条消息时会收到错误消息:

2013-06-12 11:47:52,670 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Execution of JMS message listener failed, and no ErrorHandler has been set.
org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'incomingMessageChannel'

我也尝试将 更改message-driven-channel-adapterinbound-gateway,但这给了我同样的错误。

我是否必须在 中设置错误处理程序inbound-gateway,以便错误返回 ActiveMQ 队列?如果 ThreadPoolTask​​Executor 没有空闲线程,我如何配置队列以便将消息保留在队列中?

提前致谢,

本尼迪克特

4

1 回答 1

2

不; 而不是使用执行器通道,您应该使用<message-driven-channel-adapter/>.

<dispatcher/>从通道中取出并设置concurrent-consumers="3"在适配器上。

于 2013-06-12T10:41:23.607 回答