我有一个 DefaultMessageListenerContainer,它(在我看来)没有扩大。Container 被定义为侦听一个队列,其中有 100 条消息。
我希望 Container 可以达到任何长度,消息将尽可能快地被消耗(通过观察 maxConcurrentConsumers 配置)。所以我假设有 7 个并发消费者。(从容器启动时的 2 个并发消费者开始)一些日志信息:
activeConsumerCount: 5
concurrentConsumers: 2
scheduledConsumerCount: 5
idleConsumerLimit: 1
idleTaskExecLimit: 1
maxConcurrentConsumers: 7
我的 Spring-config(它的一部分):
<bean id="abstractMessageListenerContainer" class="my.package.structure.LoggingListenerContainer" abstract="true">
<property name="connectionFactory" ref="jmscfCee" />
<property name="maxConcurrentConsumers" value="7"/>
<property name="receiveTimeout" value="100000" />
<property name="concurrentConsumers" value="2" />
</bean>
<bean class="my.package.structure.LoggingListenerContainer" parent="abstractMessageListenerContainer">
<property name="destinationName" value="MY.QUEUE" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<bean id="myMessageListener" class="my.package.structure.ListenerClass"></bean>
我的日志容器
public class LoggingListenerContainer extends DefaultMessageListenerContainer{
private static final Logger logger = Logger
.getLogger(LoggingListenerContainer.class);
@Override
protected void doInvokeListener(MessageListener listener, Message message)
throws JMSException {
logger.info("activeConsumerCount: " + this.getActiveConsumerCount());
logger.info("concurrentConsumers: " + this.getConcurrentConsumers());
logger.info("scheduledConsumerCount: " + this.getScheduledConsumerCount());
logger.info("idleConsumerLimit: " + this.getIdleConsumerLimit());
logger.info("idleTaskExecLimit: " + this.getIdleTaskExecutionLimit());
logger.info("maxConcurrentConsumers: " + this.getMaxConcurrentConsumers());
super.doInvokeListener(listener, message);
}
我的听众班:
public class ListenerClass implements MessageListener {
public void onMessage(Message msg) {
//Do some business function
}
}
有人可以纠正我的配置或给我一些关于我的配置的提示或向我解释容器的方法吗?(如果我误解了什么)
我正在使用 ActiveMQ 进行本地测试(使用 WebSphere MQ 进行生产)——如果它与可伸缩性主题相关。
编辑:
<bean id="jmscfCee" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>${jmscfCee.hostName}</value>
</property>
</bean>
<bean id="jmscfCeeCachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory ">
<constructor-arg ref="jmscfCee" />
<property name="sessionCacheSize" value="10" />
</bean>