我有两个 Java 进程,其中第一个生成消息并将它们放入 ActiveMQ 队列。第二个进程(消费者)使用 Spring Integration 从队列中获取消息并在线程中处理它们。
我有两个要求:
消费者应该有 3 个处理线程。如果我有 10 条消息通过队列进入,我想让 3 个线程处理前 3 条消息,而其他 7 条消息应该被缓冲。
当消费者在某些消息尚未处理时停止时,它应该在重新启动后继续处理消息。
这是我的配置:
<bean id="messageActiveMqQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="example.queue" />
</bean>
<int-jms:message-driven-channel-adapter
destination="messageActiveMqQueue" channel="incomingMessageChannel" />
<int:channel id="incomingMessageChannel">
<int:dispatcher task-executor="incomingMessageChannelExecutor" />
</int:channel>
<bean id="incomingMessageChannelExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="daemon" value="false" />
<property name="maxPoolSize" value="3" />
</bean>
<int:service-activator input-channel="incomingMessageChannel"
ref="myMessageProcessor" method="processMessage" />
第一个要求按预期工作。我产生 10 条消息,并且 3 个 myMessageProcessors 开始处理每条消息。一旦第一条消息完成,就处理第四条消息。
但是,当我在处理所有消息之前杀死消费者时,这些消息就会丢失。重新启动后,消费者不会再次收到这些消息。
我认为在上述配置中,这是因为 ThreadPoolTaskExecutor 生成的线程将消息排队。所以消息已经从incomingMessageChannel 中删除。因此我尝试设置incomingMessageChannelExecutor的队列容量:
<property name="queueCapacity" value="0" />
但是现在当我收到超过 3 条消息时会收到错误消息:
2013-06-12 11:47:52,670 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Execution of JMS message listener failed, and no ErrorHandler has been set.
org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'incomingMessageChannel'
我也尝试将 更改message-driven-channel-adapter
为inbound-gateway
,但这给了我同样的错误。
我是否必须在 中设置错误处理程序inbound-gateway
,以便错误返回 ActiveMQ 队列?如果 ThreadPoolTaskExecutor 没有空闲线程,我如何配置队列以便将消息保留在队列中?
提前致谢,
本尼迪克特