我正在使用 RabbitMQ,并且我有一个包含电子邮件消息的队列。我的消费者服务使消息出队并尝试发送它们。如果出于任何原因,我的消费者无法发送消息,我想将消息重新排队以再次发送。我意识到我可以做一个 basicNack 并将 requeue 标志设置为 true,但是,我不想无限期地重新排队消息(例如,如果我们的电子邮件系统出现故障,我不想不断地重新排队未发送的消息)。我想定义一个有限的次数,我可以将消息重新排队以再次发送。但是,当我将其出列并发送 nack 时,我无法在电子邮件对象上设置字段。队列中的消息上不存在更新的字段。还有其他方法可以解决这个问题吗?提前致谢。
3 回答
RabbitMQ(以及 AMQP 协议)中没有像重试尝试这样的功能。
实施重试尝试限制行为的可能解决方案:
如果之前没有重新传递消息,则重新传递消息(检查方法
redelivered
上的参数basic.deliver
- 你的库应该有一些接口)并丢弃它,然后在死信交换中捕获,然后以某种方式处理。每次无法处理消息时再次发布它,但设置或增加/减少标题字段,例如
x-redelivered-count
(您可以选择任何您喜欢的名称)。要在这种情况下控制重新传递,您必须检查您设置的字段是否达到某个限制(顶部或底部 - 0 是我的选择,ttl
在 tcp/ip 的 ip 标头中的 a-la)。将消息唯一键(例如 uuid,但您必须在发布消息时手动设置)存储在 Redis、memcache 或其他存储中,甚至在 mysql 中与重新传递计数一起存储,然后在每次重新传递时递增/递减此值,直到达到限制.
(对于真正的极客)编写将实现您想要的此类行为的插件。
#3的优点是重新传递的消息保留在队列头中。如果您排长队或消息顺序对您很重要,这一点很重要(请注意,重新投递会破坏严格的消息顺序,有关详细信息,请参阅官方文档或有关 SO 的此问题)。
PS:
这个主题有类似的答案,但在 php.ini 中。看看它,也许它对你有一点帮助(从“有多种技术来处理循环重新交付问题”这句话开始阅读它。
虽然这是一个老问题,但我认为您现在可以通过死信交换和x-death标头数组的组合轻松做到这一点,一旦消息被死信:
死信进程将一个数组添加到每个死信消息的标题中,名为 x-death。该数组包含每个死信事件的条目,由一对 {queue, reason} 标识。每个这样的条目都是一个包含几个字段的表:
queue:消息在死信之前所在的队列的名称
原因:死字的原因,见下文
time:消息被死信的日期和时间,作为 64 位 AMQP 0-9-1 时间戳
交换 - 消息发布到的交换(请注意,如果消息多次出现死信,这将是一个死信交换)
routing-keys:发布消息时使用的路由密钥(包括 CC 密钥,但不包括 BCC 密钥)
计数:由于这个原因,这条消息在这个队列中被死信的次数
original-expiration(如果消息由于每个消息的 TTL 而被死信):消息的原始过期属性。expire 属性从死信消息中删除,以防止它在路由到的任何队列中再次过期。
阅读这篇精彩的文章以获取更多信息
检查这个平局:
从我的角度来看,更好的想法是在消费者内部实现死信交换和重试逻辑的组合。如果消费者无法处理消息,那么您将消息放入 DeadLetterQueue
您可以在下面找到使用 node-amqp 和 Rabbitmq https://github.com/kharandziuk/dead-letter-exchange-prototype实现的死信交换原型。