1

我有一个 jms 生产者,它每秒生成许多消息,这些消息被发送到 amq 持久队列并由单个消费者消费,需要按顺序处理它们。但似乎生产者比消费者快得多,而且我遇到了性能和内存问题。消息的获取速度非常非常慢,并且消费似乎是间隔发生的(消费者以轮询方式“询问”消息,这很奇怪?!)

基本上,一切都发生在 spring 集成中。这是生产者端的配置。第一个质押消息进入stakesInMemoryChannel,从那里,它们被过滤并抛出filteredStakesChannel,然后它们从那里进入jms队列(使用执行器,因此发送将在单独的线程中发生)

    <bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="${jms.stakes.queue.name}" />
    </bean>

    <int:channel id="stakesInMemoryChannel" />

    <int:channel id="filteredStakesChannel" >
        <int:dispatcher task-executor="taskExecutor"/>
    </int:channel>

    <bean id="stakeFilterService" class="cayetano.games.stake.StakeFilterService"/>

    <int:filter 
       input-channel="stakesInMemoryChannel"
       output-channel="filteredStakesChannel" 
       throw-exception-on-rejection="false"
       expression="true"/>

    <jms:outbound-channel-adapter channel="filteredStakesChannel" destination="stakesQueue" delivery-persistent="true" explicit-qos-enabled="true"  />

    <task:executor id="taskExecutor" pool-size="100" />

另一个应用程序正在使用这样的消息......消息来自 jms stakesQueue 的 stakesInputChannel,之后它们被路由到 2 个单独的通道,一个保留消息,另一个做一些其他的事情,我们称之为“处理” .

    <bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg name="name" value="${jms.stakes.queue.name}" />
</bean>

<jms:message-driven-channel-adapter 
    channel="stakesInputChannel" 
    destination="stakesQueue" 
    acknowledge="auto"
    concurrent-consumers="1"
    max-concurrent-consumers="1"
    />
<int:publish-subscribe-channel id="stakesInputChannel" />
<int:channel id="persistStakesChannel" />
<int:channel id="processStakesChannel" />

<int:recipient-list-router 
        id="customRouter" 
        input-channel="stakesInputChannel" 
        timeout="3000" 
        ignore-send-failures="true" 
        apply-sequence="true"
        >
    <int:recipient channel="persistStakesChannel"/>
    <int:recipient channel="processStakesChannel"/>
</int:recipient-list-router> 


<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
    <property name="queuePrefetch" value="${jms.broker.prefetch.policy}" />
</bean>

<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="${jms.broker.url}" />     
            <property name="prefetchPolicy" ref="prefetchPolicy" /> 
            <property name="optimizeAcknowledge" value="true" /> 
            <property name="useAsyncSend" value="true" />
        </bean>
    </property>
    <property name="sessionCacheSize" value="10"/>
    <property name="cacheProducers" value="false"/>
</bean>
4

2 回答 2

1

不确定您所说的“每隔一段时间(消费者以轮询方式“询问”消息,这很奇怪)”是什么意思。

容器线程可能“看起来”像是在轮询,但实际上并非如此;它们在 AMQ 客户端中阻塞,直到消息到达或超时;当超时时,它立即返回到 AMQ 接收()。

这个配置看起来不错;对于一个线程,消耗率将直接取决于您在路由器下游所做的事情。

于 2012-11-21T14:53:23.500 回答
0

建议使用PooledConnectionFactory。这建议与 Spring JmsTemplate 一起使用,它汇集了 Connection、Session 和 MessageProducer 实例,以便在它们不再使用后可以返回它们。

我相信您在消费者方面看到的“间隔”行为是消费者超时。

与 Gary Russel 所说的相反,amq.receive()它确实有效地轮询了队列。Spring 配置隐藏了这一点,但消息基本上是在一个循环中从队列中拉出,该循环调用队列消费者的接收。receive()队列的消费者在调用尝试获取消息之前无法知道消息是否在队列中。

这与主题相反,您在其中注册一个侦听器,该侦听器在消息进入时执行操作。主题是一个优雅的解决方案,因为您注册了一个处理消息的侦听器。

使用 Topic,您告诉 activemq 如何处理消息,使用 Queue,activemq 只是在您请求消息时给您消息。

于 2012-11-23T04:57:09.030 回答