24

我创建了一个基于 spring、jms 和 activemq 的简单生产者消费者模拟,我试图从生产者和消费者双方达到高性能,

连接设置:

<tx:annotation-driven />
<bean id="transactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
     <property name="connectionFactory"  ref="connectionFactory" />
</bean>

<amq:connectionFactory id="amqConnectionFactory" brokerURL="failover:(tcp://${broker.url}:61616)"  />

<bean id="connectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory" />
</bean>

<amq:queue id="queue" physicalName="queue" />

<beans:bean id="jsonMessageConverter" class="XXXXX.converter.JsonMessageConverter" />

消费者设置:

<jms:listener-container concurrency="10"
    acknowledge="auto" prefetch="1" message-converter="jsonMessageConverter" transaction-manager="transactionManager"

    >
    <jms:listener id="queueListener_1" destination="ooIntegrationQueue"
        ref="myMessageListenerAdapter" />
</jms:listener-container>


<beans:bean id="myMessageListenerAdapter"
    class="org.springframework.jms.listener.adapter.MessageListenerAdapter" >
    <beans:property name="delegate" ref="consumer"/>
</beans:bean>


<beans:bean id="consumer" class="XXX.ConsumerImpl"/>

生产者设置:

<beans:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
    p:connectionFactory-ref="connectionFactory" p:messageConverter-ref="jsonMessageConverter"
    p:defaultDestination-ref="ooIntegrationQueue" p:sessionTransacted="true" />

从消费者开始,我设法每秒消耗大约 25 条消息,这非常慢,我发现瓶颈是我正在使用事务,在谷歌搜索了一下并使用配置后,我发现自动装配 DefaultMessageListenerContainer 并将缓存级别更改为

listenerContainer.setCacheLevelName("CACHE_SESSION") 

我的性能提高到每秒大约 1500 条消息,同时仍然有事务。

我现在的问题是生产者仍然停留在每秒大约 25 次操作,我的生产者测试很简单:

int numOfMessages = getNumberOfMessages();


double startTime = System.currentTimeMillis();

for (int i = 1; i <= numOfMessages; i++) {
    jmsTemplate.convertAndSend("HelloWorld" + i);
}

double endTime = System.currentTimeMillis();

double totalTime=(endTime-startTime)/1000;
System.out.println("Time - "+totalTime+" seconds");
System.out.println("EPS - "+numOfMessages/totalTime);

我想知道如何与制作人达到类似的性能,因为它现在成为整个系统的瓶颈。

4

4 回答 4

16

抱歉,如果这个答案来晚了以帮助原始海报。我最近调查了JmsTemplate性能。即使使用相同的交付和确认模式,本机JMS代码似乎也比JmsTemplate. 问题原来是ActiveMQ通常默认为异步发送,但是当您使用JmsTemplate它时,改为使用同步发送。这会大大降低性能。您可以将ActiveMQConnectionFactory'useAsyncSend属性设置true为强制异步发送。更多细节在这里:JmsTemplate 不是邪恶的

于 2013-01-05T01:47:44.030 回答
6

JMSTemplate 遍历 ConnectionFactiory -> Connection -> Session -> MessageProducer,在每次发送后关闭每个对象。为了解决这个问题,使用 org.apache.activemq.pool.PooledConnectionFactory 包装您的 amqConnectionFactory bean,并在模板下使用它而不是 CachingConnectionFactory。

于 2011-08-15T19:49:42.290 回答
1

尝试将确认方法从 AUTO 更改为 CLIENT_ACKNOWLEDGE。有关更多信息,请查看规范

于 2012-11-28T21:38:47.807 回答
0

ActiveMQ 的默认交付模式是什么?它是一个持久队列吗?如果是这样,它是如何配置的?经纪人有多远?这些答案将通过回答服务器确认发送需要多长时间来确定发送到队列的基本成本(即网络 RTT + 将消息同步持久化到磁盘的潜在成本)。

另一种可能性是您实际上是在每次发送时创建一个新的连接、会话和消息生产者。至少可以说这是相当昂贵的。值得确认这是否正在发生(例如,将调试日志添加到 spring,检查 amq 管理控制台以了解连接流失)是否作为基本的健全性检查。从外观上看,它CachingConnectionFactory应该默认缓存单个会话和消息生产者,并且convertAndSend应该在发送后关闭它获得的会话,这会导致将该缓存的会话返回到池中。这应该意味着在下一次发送时获取缓存会话相对较快(spring jms 通过大量代码只是为了发送消息)。

于 2011-08-15T14:48:54.203 回答