我有一个发布 - 订阅用例,我想在发布端阻止,直到每个订阅者确认他们已完成对发布者发送的消息的处理。
我(错误地?)假设我可以使用 RabbitMQ 及其 Java amqp-client 的 Channel.waitForConfirmsOrDie 方法作为我的解决方案的一部分。问题是我还没有发现 waitForConfirmsOrDie 实际上会阻塞的情况。
根据javadocs, waitForConfirmsOrDie 应该:
等到自上次调用以来发布的所有消息都已被代理确认或确认。如果有任何消息被取消,waitForConfirmsOrDie 将抛出 IOException。在非确认通道上调用时,它将立即返回。
为了测试这个方法是否真的有效,我从 RabbitMQ 网站上的这个示例代码开始。
示例代码创建了一个发布者和一个消费者,每个都在自己的单独线程上。然后发布者将消息发送到交换,而消费者消费消息。似乎发布者应该阻止,直到所有消息都通过调用 waitForConfirmsOrDie() 得到确认。
这个示例代码似乎与我想要做的完全匹配。但是,它似乎不像我想象的那样工作。事实上,如果在消费者线程中,我关闭了自动确认消息,那么 waitForConfirmsOrDie() 仍然会立即返回。
我通过将一个 false 更改为 true 来关闭自动确认:
ch.queueDeclare(QUEUE_NAME, false, false, false, null);
变为
ch.queueDeclare(QUEUE_NAME, true, false, false, null);
(第二个参数 false 而不是 true)。我相信这意味着消费者不应再发送确认信息。
那么 waitForConfirmsOrDie() 实际上做了什么?什么时候会阻塞?
如果 waitForConfirmsOrDie 没有做我想做的事,有没有办法让发布者等到所有订阅者都确认消息后再继续?