我编写了 Java 应用程序,它将消息发送到 RabbitMQ。然后 Flume 从 RabbitMQ 队列中获取消息。我很感兴趣没有人从队列中提取消息,除了水槽。
我的应用程序使用 Spring AMQP Java 插件。
问题:
使用下面的代码,消息进入 RabbitMQ 队列并永远保持“未知”。据我了解,RabbitMQ 正在等待来自 MessageListener 的 ACK,但 MessageListener 永远不会 ACK。有人知道如何解决吗?
编码:
public class MyAmqpConfiguration {
@Autowired
ConnectionFactory connectionFactory;
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(activityLogsQueue());
container.setMessageListener(MyMessageListener());
container.setConcurrentConsumers(3);
return container;
}
@Bean(name="myTemplate")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(MyMessageConverter());
return template;
}
}
public class MyMessageListener implements MessageListener {
public MyMessageListener(MessageConverter converter, MyMessageHandler<MyObject> messageHandler) {
this.converter = converter;
this.messageHandler = messageHandler;
}
@Override
public void onMessage(Message message) {
this.messageHandler.doThings();
}
}
public class MyMessageHandler {
@Autowired
@Qualifier("myTemplate")
RabbitTemplate template;
@Override
public void handleMessage(MyObject thing) {
template.convertAndSend(exchange, routingKey, thing);
}
}
public class MyMessageConverter extends JsonMessageConverter {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
//do things
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
throw new UnsupportedOperationException("fromMessage is not supported in "+this.getClass().getName());
}
}