我是使用 JMS/ActiveMQ 的新手,我有一个 Spring/Hibernate 应用程序,它从 ActiveMQ 中的队列中获取消息并处理这些消息以实现持久性。由于消息需要一段时间来处理和持久化,我将我的 DefaultMessageListenerContainer 配置为具有多个消费者(例如 5-10),因此可以同时处理多个消息。我查看了很多 ActiveMQ 和 Spring API 文档,我认为我需要做的就是将 maxConcurrentConsumers 设置为 10 + 将 concurrentConsumers 设置为 5 或在 DefaultMessageListenerContainer 上将并发设置为 5-10。一旦我这样做了,我可以从 ActiveMQ 的内置控制台看到我的队列确实有 5 个消费者。但是当我在队列中放置 10 或 100 条消息时,处理似乎是单线程的,我添加了一个日志行来打印线程 ID,它似乎是相同的线程 ID 按顺序处理所有请求。从控制台上的 ActiveMQ 的队列页面,我单击浏览活动消费者链接以查看正在发生的事情,看起来一个消费者有所有 100 条消息待处理,而其他 4 条则没有。
我做了一些研究,发现这篇文章来自 Spring ( http://forum.springsource.org/showthread.php?61170-Messages-missed-using-DefaultMessageListenerContainer) 并添加了一个值为 2 的预取策略,认为每个消费者都在为自己注册 1000 条消息。现在,当我发送另一批消息时,一个消费者将有 2-3 条消息待处理,但其他 4 个消费者保持空闲状态,并且最终由该消费者按顺序处理所有内容。在这一点上,我想可能是我在 ActiveMQ 代理上配置错误。我在文档中读到默认调度策略是循环策略,但我在我的 activemq.xml 中看到一个名为 constantPendingMessageLimitStrategy 的设置设置为 1000,并尝试将其设置为非常低的数字(例如 2),认为它控制了多少消息经纪人一次发送给消费者,但仍然没有做任何事情。希望有人能指出我做错了什么,我' 我已经在下面发布了我的 spring 配置,除了尝试一个设置(constantPendingMessageLimitStrategy)之外,我真的没有接触过 activemq.xml。我正在使用 ActiveMQ 5.8。
<bean id="importRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="15000" />
<property name="maximumRedeliveries" value="-1" />
<property name="useExponentialBackOff" value="true" />
<property name="backOffMultiplier" value="2" />
</bean>
<bean id="importPrefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="all" value="2"></property>
</bean>
<bean id="importConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${import.queue.url}"/>
<property name="redeliveryPolicy" ref="importRedeliveryPolicy" />
<property name="prefetchPolicy" ref="importPrefetchPolicy"></property>
</bean>
<bean id="importQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${import.queue.name}" />
</bean>
<bean id="importListener" class="com.mycompany.ImportQueueListener" >
<property name="importService" ref="importService"></property>
<property name="sessionFactory" ref="sessionFactory"/>
</bean>
<bean id="importJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="importConnectionFactory" />
<property name="destination" ref="importQueue" />
<property name="messageListener" ref="importListener" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="10"></property>
<property name="concurrentConsumers" value="5"></property>
</bean>