对于我当前的项目,我需要使用来自许多目的地(从数百到 20 或 30k)的消息,所有目的地都是主题。目前(对于初始负载测试)所有消息都是在同一台服务器上本地创建的,在线程池中。
我当前的 spring 配置在代理网络(用于集群)和 DefaultMessageListenerContainers (DMLCs) 中使用嵌入式 activemq,并带有一个通用的 TaskExecutor。虽然目的地的数量非常多,但每个目的地的吞吐量相对较低。
我唯一的要求是尽快消费所有消息。
我的配置:
<bean id="connectionfactory" class="org.springframework.jms.connection.CachingConnectionFactory" destroy-method="destroy">
<property name="targetConnectionFactory">
<ref bean="amqConnectionFactory" />
</property>
</bean>
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost:61616?async=false&jms.dispatchAsync=false" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
<bean id="listenerThreadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="70" />
<property name="maxPoolSize" value="70" />
<property name="daemon" value="true" />
<property name="keepAliveSeconds" value="60" />
</bean>
<!-- Message Listener Container Template for Topics -->
<bean id="topiccontainertemplate" class="org.springframework.jms.listener.DefaultMessageListenerContainer" scope="prototype"
destroy-method="destroy">
<property name="autoStartup" value="false" />
<property name="connectionFactory" ref="connectionfactory" />
<property name="pubSubDomain" value="true" />
<property name="cacheLevelName" value="CACHE_CONSUMER" />
<property name="destinationName" value="default" />
<property name="maxMessagesPerTask" value="1" />
<property name="receiveTimeout" value="1" />
<property name="taskExecutor" ref="listenerThreadPoolTaskExecutor" />
</bean>
我的代码使用应用程序上下文作为 DMLC-Factory 并设置容器的最终配置:
AbstractMessageListenerContainer container = context.getBean("simpletopiccontainertemplate", AbstractMessageListenerContainer.class);
container.setDestinationName(localEntity.getId().getDestination());
container.setMessageListener(mylistener);
container.start();
虽然我们在此配置中不会丢失消息,但单个消息的转换时间可能会很长。
Q1:有没有更有效的方式来收听大量目的地?
Q2:我的监听器配置有可能改进吗?
Q3:除了 DMLC,我还尝试了 SimpleMessageListenerContainer,但我无法让它工作。我的配置有问题吗?
<bean id="simpletopiccontainertemplate" class="org.springframework.jms.listener.SimpleMessageListenerContainer" scope="prototype"
destroy-method="destroy">
<property name="autoStartup" value="false" />
<property name="connectionFactory" ref="connectionfactory" />
<property name="destinationName" value="default" />
<property name="concurrency" value="1" />
<property name="pubSubDomain" value="true" />
<property name="taskExecutor" ref="listenerThreadPoolTaskExecutor" />
</bean>