5

我有一个发布 - 订阅用例,我想在发布端阻止,直到每个订阅者确认他们已完成对发布者发送的消息的处理。

我(错误地?)假设我可以使用 RabbitMQ 及其 Java amqp-client 的 Channel.waitForConfirmsOrDie 方法作为我的解决方案的一部分。问题是我还没有发现 waitForConfirmsOrDie 实际上会阻塞的情况。

根据javadocs, waitForConfirmsOrDie 应该:

等到自上次调用以来发布的所有消息都已被代理确认或确认。如果有任何消息被取消,waitForConfirmsOrDie 将抛出 IOException。在非确认通道上调用时,它将立即返回。

为了测试这个方法是否真的有效,我从 RabbitMQ 网站上的这个示例代码开始。

示例代码创建了一个发布者和一个消费者,每个都在自己的单独线程上。然后发布者将消息发送到交换,而消费者消费消息。似乎发布者应该阻止,直到所有消息都通过调用 waitForConfirmsOrDie() 得到确认。

这个示例代码似乎与我想要做的完全匹配。但是,它似乎不像我想象的那样工作。事实上,如果在消费者线程中,我关闭了自动确认消息,那么 waitForConfirmsOrDie() 仍然会立即返回。

我通过将一个 false 更改为 true 来关闭自动确认: ch.queueDeclare(QUEUE_NAME, false, false, false, null); 变为 ch.queueDeclare(QUEUE_NAME, true, false, false, null);(第二个参数 false 而不是 true)。我相信这意味着消费者不应再发送确认信息。

那么 waitForConfirmsOrDie() 实际上做了什么?什么时候会阻塞?

如果 waitForConfirmsOrDie 没有做我想做的事,有没有办法让发布者等到所有订阅者都确认消息后再继续?

4

1 回答 1

11

据我了解,这些电话不应该等待消费者的确认。方法的目的waitForConfirms*是确保您的消息被传递到代理并提供基本的传递/失败类型的通知。换句话说,如果 rmq 节点之一(或什至所有节点)失败/不可用,消息不会在不通知产品的情况下消失。

如果您在通话前断开或关闭 rmq,您可以看到此异常basicPublish

于 2013-02-04T08:23:57.393 回答