我正在处理一个大容量流 ~ 每秒 500+ 条消息,使用具有 10 个并发消费者的 SimpleMessageListenerContainer 从 Spring AMQP+Rabbit 消耗数据,我必须每 15 分钟对 Db 进行一些检查并重新加载某些属性以进行处理,这是通过石英触发器完成的,该触发器每 15 分钟触发一次,停止 SimplelistenerContainer,完成必要的工作并再次启动 Container。
当应用程序启动时一切正常,当触发器触发并且容器重新启动时,我看到多次传递相同的消息,这会导致很多重复。消费者没有抛出任何异常。
消息监听器
class RoundRobinQueueListener implements MessageListener {
@Override
public void onMessage(Message message) { //do processing
}
}
在应用启动期间设置并行消费者并启动消费者
final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance();
messageListenerContainer.setQueueNames(queueName);
messageListenerContainer.setMessageListener(roundRobinListener);
messageListenerContainer.setConcurrentConsumers(10);
messageListenerContainer.setChannelTransacted(true);
石英触发器
void execute(JobExecutionContext context) throws JobExecutionException {
messageListenerContainer.stop()
//Do db task, other processing
messageListenerContainer.start()
}