3

我同时开始生产者和消费者。6 小时后,生产者在队列中生成了大约 6 千万条消息,并在 6 小时后停止了生产者,但消费者仍在持续运行,即使在运行 18 小时后仍有 4 千万条消息在队列中。谁能告诉我为什么消费者的表现很慢?

提前致谢!

@Bean
    public SimpleMessageListenerContainer listenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames(this.queueName);
        container.setMessageListener(new MessageListenerAdapter(new TestMessageHandler(), new JsonMessageConverter()));
        return container;
    }
@Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                "localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(new JsonMessageConverter());
        template.setRoutingKey(this.queueName);
        template.setQueue(this.queueName);
        return template;
    }

    public class TestMessageHandler  {
           // receive messages
        public void handleMessage(MessageBeanTest msgBean) {
                   //  Storing bean data into CSV file
             }
    }
4

2 回答 2

3

根据 Gary 的建议,您可以按如下方式设置它们。查看@RabbitListener

@Bean
public SimpleRabbitListenerContainerFactory listenerContainer(     {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(baseConfig.connectionFactory());
    factory.setConcurrentConsumers(7); // choose a value
    factory.setPrefetchCount(1); // how many messages per consumer at a time
    factory.setMaxConcurrentConsumers(10); // choose a value
    factory.setDefaultRequeueRejected(false); // if you want to deadletter
    return factory;
}
于 2016-08-10T18:37:30.320 回答
2

根据维基百科,千万 == 10,000,000 所以你的意思是 6000 万。

容器只能像您的侦听器那样快速处理消息 - 您需要分析您对每条消息所做的工作。

您还需要尝试容器并发设置(concurrentConsumers)、预取等,以获得最佳性能,但最终它仍然是您的侦听器,占用了大部分处理时间;容器的开销非常小。如果您的侦听器构造不当,增加并发性将无济于事。

如果您使用交易,这将显着减慢消费。

尝试使用对消息不执行任何操作的侦听器。

最后,在提出此类问题时,您应该始终显示配置。

于 2013-09-11T15:03:03.037 回答