4

我试图了解 RabbitMQ 中消息删除的逻辑。

我的目标是即使没有客户端连接来读取它们,也使消息​​持续存在,这样当客户端重新连接时,消息正在等待它们。我可以使用持久的惰性队列,以便将消息持久保存到磁盘,并且我可以使用 HA 复制来确保多个节点获得所有排队消息的副本。

我希望使用主题或标头路由将消息发送到两个或多个队列,并让一个或多个客户端读取每个队列。

我有两个队列,A 和 B,由标头交换提供。队列 A 获取所有消息。队列 B 仅获取带有“归档”标头的消息。队列 A 有 3 个消费者正在阅读。队列 B 有 1 个消费者。如果 B 的消费者死了,但 A 的消费者继续确认消息,RabbitMQ 是删除消息还是继续存储?在 B 重新启动之前,队列 B 不会有任何人使用它,我希望这些消息保持可用以供以后使用。

到目前为止,我已经阅读了很多文档,但仍然没有找到明确的答案。

4

3 回答 3

6

RabbitMQ 将在确认后决定何时删除消息。

假设您有一个消息发件人:

var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "hello",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    string message = "Hello World!";
    var body = Encoding.UTF8.GetBytes(message);

    channel.BasicPublish(exchange: "",
                         routingKey: "hello",
                         basicProperties: null,
                         body: body);
    Console.WriteLine(" [x] Sent {0}", message);
}

这将创建一个持久队列“hello”并发送消息“Hello World!” 给它。这是向队列发送一条消息后的样子。

在此处输入图像描述

现在让我们设置两个消费者,一个确认收到了消息,一个确认没有收到。

channel.BasicConsume(queue: "hello",
                    autoAck: false,
                    consumer: consumer);

channel.BasicConsume(queue: "hello",
                    autoAck: true,
                    consumer: consumer);

如果只运行第一个消费者,则消息永远不会从队列中删除,因为消费者声明只有客户端手动确认消息才会从队列中消失:https://www.rabbitmq.com/confirms。 html

然而,第二个消费者将告诉队列它可以安全地自动/立即删除它收到的所有消息。

如果您不想自动删除这些消息,则必须禁用 autoAck 并使用文档进行一些手动确认:

http://codingvision.net/tips-and-tricks/c-send-data-between-processes-w-memory-mapped-file(向下滚动到“手动确认”)。

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
于 2018-02-15T21:51:59.533 回答
2

简单的答案是,从一个队列中消耗的消息与另一个队列中的消息无关。发布消息后,代理会根据需要将副本分发到尽可能多的队列 - 但它们是消息的真实副本,就代理而言,从那时起绝对不相关。

排队到持久队列中的消息会一直保留,直到它们被队列上的消费者拉出,并且可以选择确认。

请注意,有特定的队列级和消息级 TTL 设置可能会影响这一点。例如,如果队列有 TTL,并且消费者在过期之前没有重新连接,则队列将连同其所有消息一起消失。类似地,如果一条消息已使用特定的 TTL 排队(也可以将其设置为特定队列上所有消息的默认值),那么一旦该 TTL 通过,该消息将不会传递给消费者。

次要注意如果消息由于 TTL 在队列中过期,它实际上会保留在队列中,直到下一次被传递。

于 2018-02-16T06:17:00.950 回答
1

RabbitMQ 删除消息的方式有多种。他们之中有一些是:

  • 消费者确认后
  • 已达到该队列的生存时间 (TTL)。
  • 到达该队列的消息的生存时间 (TTL)。

最后两点表明 RabbitMQ 允许您为消息和队列设置 TTL(生存时间)。可以通过将x-message-ttl参数设置为queue.declare或通过设置message-ttl 策略来为给定队列设置TTL 。可以通过将x-expires参数设置为queue.declare或设置expires policy来为给定队列设置到期时间。

如果消息在队列中的停留时间超过配置的 TTL,则称该消息已死。这里要注意的重要一点是,路由到不同队列的单个消息可能会在不同的时间死亡,或者有时永远不会在它所在的每个队列中死亡。 一个队列中消息的死亡对其他队列中同一消息的生命没有影响

于 2018-02-21T09:23:37.267 回答