我的 Java 应用程序将消息发送到 RabbitMQ 交换器,然后交换器将消息重定向到绑定队列。我将 Springframework AMQP java 插件与 RabbitMQ 一起使用。
问题:消息进入队列,但它停留在“未确认”状态,它永远不会变成“就绪”。
可能是什么原因?
我的 Java 应用程序将消息发送到 RabbitMQ 交换器,然后交换器将消息重定向到绑定队列。我将 Springframework AMQP java 插件与 RabbitMQ 一起使用。
问题:消息进入队列,但它停留在“未确认”状态,它永远不会变成“就绪”。
可能是什么原因?
Unacknowledged 消息意味着它已被您的消费者读取,但消费者从未向 RabbitMQ 代理发送回 ACK 表示它已完成处理。
我对 Spring Framework 插件并不太熟悉,但是在某个地方(对于您的消费者)您将声明您的队列,它可能看起来像这样(取自http://www.rabbitmq.com/tutorials/tutorial-two -java.html ):
channel.queueDeclare(queueName, ....)
然后你将设置你的消费者
bool ackMode = false;
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, ackMode, consumer);
上面的 ackMode 是一个布尔值,通过将其设置为 false,我们明确地告诉 RabbitMQ,我的消费者将确认给定的每条消息。如果此标志设置为 true,那么您将不会在 RabbitMQ 中看到 Unacknowledged 计数,而是在消费者读取消息后(即,它已交付给消费者,它将从队列中删除)。
要确认消息,您将执行以下操作:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//...do something with the message...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //the false flag is to do with multiple message acknowledgement
如果您可以发布一些消费者代码,那么我可能会提供进一步的帮助......但同时具体看一下BlockingQueueConsumer:您将看到的构造函数可以设置AcknowledgeMode并查看nextMessage() 这将返回一个 Message 对象,其中包含一个名为 getDeliveryTag() 的方法 这将返回一个 Long ,这是您将在 basicAck 上发回的 ID
只是为了让消息保持未确认状态的另一个可能原因加上我的 2 美分,即使消费者确保使用 basicAck 方法 -
有时,具有打开的 RabbitMQ 连接的进程的多个实例保持运行,其中一个可能导致消息卡在未确认状态,从而阻止消费者的另一个实例重新获取此消息。
您可以访问 RabbitMQ 管理控制台(对于本地计算机,这应该在 localhost:15672 可用),并检查是否有多个实例占用了通道,或者当前是否只有一个实例处于活动状态:
找到多余的正在运行的任务(在本例中为 java)并终止它。移除恶意进程后,您应该会看到消息再次跳转到就绪状态。