我有一个基于 Spring JMS 和 Active MQ (5.6) 的系统构建,它有大约 12 个 Spring 默认消息侦听器容器(每个最多 20 个并发实例),它们都连接到同一个活动 mq 目标(队列)。
系统通过每个处理程序(容器)从发送给自己的队列中获取消息,使用选择器,完成其工作,然后将消息放回队列中,直到所有工作完成。
我正在做一个基准测试,发送 25,000 条消息,每条消息都需要通过 9 个不同的处理程序。
每次我运行测试时,只有大约 11300 条消息通过我的所有处理程序,但活动 MQ 不再发送任何消息。
在我当前的测试结束时,我可以看到我的队列的以下统计信息: Enqueue Count: 120359 Dequeue Count: 106693 Dispatch Count: 106693 Inflight Count: 0 Queue Size: 13666
除非我重新启动代理,否则 Active MQ 不会再分派消息。
下面是我的 active-mq 配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">
<bean id="propertyConfigurer" class="org.springframework.web.context.support.ServletContextPropertyPlaceholderConfigurer" />
<!-- The <broker> element is used to configure the ActiveMQ broker. -->
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="jmsDeployMqBroker" dataDirectory="${java.io.tmpdir}/activemq-data"
destroyApplicationContextOnStop="true" useJmx="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false">
</policyEntry>
<policyEntry queue=">" producerFlowControl="false">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<destinations>
<queue physicalName="handlersDest"/>
<topic physicalName="notificationsDest" />
<queue physicalName="ActiveMQ.DLQ" />
</destinations>
<!-- The managementContext is used to configure how ActiveMQ is exposed
in JMX. By default, ActiveMQ uses the MBean server that is started by the
JVM. For more information, see: http://activemq.apache.org/jmx.html -->
<managementContext>
<managementContext createConnector="false" />
</managementContext>
<!-- Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag). For more information,
see: http://activemq.apache.org/persistence.html -->
<persistenceAdapter>
<amq:kahaPersistenceAdapter directory="${java.io.tmpdir}/activemq-data/kahadb" maxDataFileLength="1g" />
</persistenceAdapter>
<!-- The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html -->
<transportConnectors>
<transportConnector name="openwire" uri="${org.apache.activemq.brokerURL}" />
</transportConnectors>
</broker>
</beans>
这是我的处理程序的弹簧配置示例:
<jee:jndi-lookup id="connectionFactory" jndi-name="${jndi.jms.connfactory}">
<jee:environment>
java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
java.naming.provider.url = ${jndi.jms.naming.url}
</jee:environment>
</jee:jndi-lookup>
<!-- ID must not change as it is used in autowiring the handlers -->
<jee:jndi-lookup id="handlersDest" jndi-name="${jndi.docprod.queue}">
<jee:environment>
java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
java.naming.provider.url = ${jndi.jms.naming.url}
${jndi.queue.setup}
</jee:environment>
</jee:jndi-lookup>
<!-- ID must not change as it is used in autowiring the handlers -->
<jee:jndi-lookup id="notificationsDest" jndi-name="${jndi.docprod.topic}">
<jee:environment>
java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
java.naming.provider.url = ${jndi.jms.naming.url}
${jndi.topic.setup}
</jee:environment>
</jee:jndi-lookup>
<bean id="dmsReadContainer" class="mydomain.DocProdnMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:handlerClass="mydomain.DmsReadHandler"
p:messageListener-ref="dmsReadHandler"
p:destination-ref="handlersDest" >
<property name="concurrentConsumers"><value>${dmsRead.initialInstances}</value></property>
<property name="maxConcurrentConsumers"><value>${dmsRead.maxInstances}</value></property>
<property name="idleConsumerLimit"><value>${dmsRead.idleInstances}</value></property>
</bean>
<bean id="dmsReadHandler" class="mydomain.DmsReadHandler">
</bean>
...
ActiveMQ 的日志文件没有显示任何异常,这表明它停止调度的原因。
有谁知道为什么不会发送更多消息或有任何建议来进一步诊断问题?