3

我正在处理一个大容量流 ~ 每秒 500+ 条消息,使用具有 10 个并发消费者的 SimpleMessageListenerContainer 从 Spring AMQP+Rabbit 消耗数据,我必须每 15 分钟对 Db 进行一些检查并重新加载某些属性以进行处理,这是通过石英触发器完成的,该触发器每 15 分钟触发一次,停止 SimplelistenerContainer,完成必要的工作并再次启动 Container。

当应用程序启动时一切正常,当触发器触发并且容器重新启动时,我看到多次传递相同的消息,这会导致很多重复。消费者没有抛出任何异常。

消息监听器

 class RoundRobinQueueListener implements MessageListener {

@Override
public void onMessage(Message message) { //do processing
     }

  }

在应用启动期间设置并行消费者并启动消费者

final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
       RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance();
        messageListenerContainer.setQueueNames(queueName);
        messageListenerContainer.setMessageListener(roundRobinListener);
        messageListenerContainer.setConcurrentConsumers(10);
        messageListenerContainer.setChannelTransacted(true);

石英触发器

    void execute(JobExecutionContext context) throws JobExecutionException {
    messageListenerContainer.stop()
    //Do db task, other processing
    messageListenerContainer.start()
    }
4

1 回答 1

1

看起来您的消息现在已被消费者确认。如果您没有使用自动确认模式,则需要自己确认消息(这也可以在SimpleMessageListenerContainer中配置)。否则,代理会假定消息未成功处理并尝试再次传递它。

于 2012-11-28T12:04:07.720 回答