我有一个 jms 生产者,它每秒生成许多消息,这些消息被发送到 amq 持久队列并由单个消费者消费,需要按顺序处理它们。但似乎生产者比消费者快得多,而且我遇到了性能和内存问题。消息的获取速度非常非常慢,并且消费似乎是间隔发生的(消费者以轮询方式“询问”消息,这很奇怪?!)
基本上,一切都发生在 spring 集成中。这是生产者端的配置。第一个质押消息进入stakesInMemoryChannel,从那里,它们被过滤并抛出filteredStakesChannel,然后它们从那里进入jms队列(使用执行器,因此发送将在单独的线程中发生)
<bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="${jms.stakes.queue.name}" />
</bean>
<int:channel id="stakesInMemoryChannel" />
<int:channel id="filteredStakesChannel" >
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<bean id="stakeFilterService" class="cayetano.games.stake.StakeFilterService"/>
<int:filter
input-channel="stakesInMemoryChannel"
output-channel="filteredStakesChannel"
throw-exception-on-rejection="false"
expression="true"/>
<jms:outbound-channel-adapter channel="filteredStakesChannel" destination="stakesQueue" delivery-persistent="true" explicit-qos-enabled="true" />
<task:executor id="taskExecutor" pool-size="100" />
另一个应用程序正在使用这样的消息......消息来自 jms stakesQueue 的 stakesInputChannel,之后它们被路由到 2 个单独的通道,一个保留消息,另一个做一些其他的事情,我们称之为“处理” .
<bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="${jms.stakes.queue.name}" />
</bean>
<jms:message-driven-channel-adapter
channel="stakesInputChannel"
destination="stakesQueue"
acknowledge="auto"
concurrent-consumers="1"
max-concurrent-consumers="1"
/>
<int:publish-subscribe-channel id="stakesInputChannel" />
<int:channel id="persistStakesChannel" />
<int:channel id="processStakesChannel" />
<int:recipient-list-router
id="customRouter"
input-channel="stakesInputChannel"
timeout="3000"
ignore-send-failures="true"
apply-sequence="true"
>
<int:recipient channel="persistStakesChannel"/>
<int:recipient channel="processStakesChannel"/>
</int:recipient-list-router>
<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch" value="${jms.broker.prefetch.policy}" />
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.broker.url}" />
<property name="prefetchPolicy" ref="prefetchPolicy" />
<property name="optimizeAcknowledge" value="true" />
<property name="useAsyncSend" value="true" />
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>