4

我编写了 Java 应用程序,它将消息发送到 RabbitMQ。然后 Flume 从 RabbitMQ 队列中获取消息。我很感兴趣没有人从队列中提取消息,除了水槽。

我的应用程序使用 Spring AMQP Java 插件。

问题:

使用下面的代码,消息进入 RabbitMQ 队列并永远保持“未知”。据我了解,RabbitMQ 正在等待来自 MessageListener 的 ACK,但 MessageListener 永远不会 ACK。有人知道如何解决吗?

编码:

public class MyAmqpConfiguration {

    @Autowired
    ConnectionFactory connectionFactory;

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(activityLogsQueue());
    container.setMessageListener(MyMessageListener());
            container.setConcurrentConsumers(3);

    return container;
    }

        @Bean(name="myTemplate")
        public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(MyMessageConverter());
        return template;
        }
}


public class MyMessageListener implements MessageListener {


   public MyMessageListener(MessageConverter converter, MyMessageHandler<MyObject> messageHandler) {
      this.converter = converter;
      this.messageHandler = messageHandler;
    }

   @Override
   public void onMessage(Message message) {
     this.messageHandler.doThings();
   }

}

public class MyMessageHandler  {

     @Autowired
     @Qualifier("myTemplate")
     RabbitTemplate template;

     @Override
     public void handleMessage(MyObject thing) {
         template.convertAndSend(exchange, routingKey, thing);
     }

}


public class MyMessageConverter extends JsonMessageConverter {

    @Override
     protected Message createMessage(Object object, MessageProperties messageProperties) { 
        //do things
     }

     @Override
     public Object fromMessage(Message message) throws MessageConversionException {
         throw new UnsupportedOperationException("fromMessage is not supported in "+this.getClass().getName());
     }


}
4

2 回答 2

4

如果您不想确认每条消息,那么您可以通过执行在 SimpleMessageListenerContainer 上设置 AcknowledgeMode

container.setAcknowledgeMode(AcknowledgeMode.NONE);

查看API 参考以获取更多信息。

更新:应该是AcknowledgeMode.NONE

设置为 AcknowledgeMode.NONE 告诉代理不要期望任何确认,并且它将假定所有消息在发送后立即得到确认(这在本机 Rabbit 代理术语中是“自动确认”)。如果 AcknowledgeMode.NONE 则通道不能是事务性的(因此如果该标志被意外设置,容器将在启动时失败)。

于 2012-08-14T13:38:53.650 回答
0

这是导致阅读解决方案的讨论:

http://forum.springsource.org/showthread.php?129304-Spring-AMQP-would-like-to-put-message-to-queue-and-send-ACK-immediately&p=422064&posted=1#post422064

于 2012-08-14T16:28:43.987 回答