1

我有以下配置来使用队列中的消息。我需要确保任务执行器一次只能执行一项任务,因此我也配置了任务执行器,如下所示。

<bean name="jmsTaskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="1" />
    <property name="maxPoolSize" value="2" />
</bean>

当我如上所述配置我的任务 Executor 时,一次消耗 10 条消息(队列中有大量消息流)并且容器停止侦听消息近 10-15 分钟。我的容器配置如下:

<bean id="queueContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachedConnectionFactory" />
    <property name="destination" ref="queue" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="idleTaskExecutionLimit" value="1" />
    <property name="idleConsumerLimit" value="5" />
    <property name="receiveTimeout" value="10000" />
    <property name="recoveryInterval" value="10000" />
    <property name="taskExecutor" ref="jmsTaskExecutor" />
    <property name="messageListener" ref="queueListener" />
    <property name="autoStartup" value="true" />
</bean>

在进行了一些谷歌搜索之后,我尝试使用 SyncTaskExecutor 而不是 ThreadPoolTask​​Executor 并且我已将我的 taskExecutor 配置如下:

<bean name="jmsTaskExecutor"
            class="org.springframework.core.task.SyncTaskExecutor" />

但这会导致tomcat中的内存泄漏。

能否请您告诉我如何才能实现仅在任务完成后才消费消息和处理消息到任务的行为?

队列监听代码如下:

public class QueueListener implements SessionAwareMessageListener<Message>{
 @override
 public void onMessage(Message msg,Session ses) throws JMSException{
 ....
 ....
 ....
 }
}
4

1 回答 1

0

当我如上所述配置我的任务执行器时,一次消耗 10 条消息

这表明您的预取大小为 10。因此,即使您只有一个线程处理消息,该线程一次也会接收 10 条消息。如果您以 0 或 1 的预取大小(取决于代理实现)启动代理,您将获得所需的“一次一条消息”行为。

例如,如果您使用 ActiveMQ ,您可以查看此链接以设置预取大小。

如果您的消息处理需要时间,那么我们需要查看更多您的“onMessage”代码,以了解您可能在哪里花费时间。

于 2013-05-18T21:47:08.887 回答