7

可以将消息发布到具有过期 TTL 的 RabbitMQ 队列中:一旦 TTL 完成并且(如果设置了死信队列),此类消息将过期并删除到死信队列。

但是是否可以使用 Celery 指定这样的每条消息 TTL?

请注意,我不是在寻找一种方法来指定任务到期,而是在寻找消息到期:我希望我的消息在最终被@死信队列接收之前在队列中花费(可配置的)时间量。

TIA。

4

1 回答 1

1

简短介绍:过期与过期

RabbitMQ 确实支持每个消息的 TTL(以及队列的 TTL),行为记录在这里:https ://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers 。诀窍是在消息发布时(以毫秒为单位)设置expiration消息属性(https://www.rabbitmq.com/publishers.html#message-properties)。

另一方面,Celery 允许您以秒或日期时间的形式设置expires参数(https://docs.celeryproject.org/en/stable/reference/celery.app.task.html)。与原生 RabbitMQ 功能的不同之处在于,消息在过期后仍保留在队列中。过期的消息被传递给worker,worker然后读取expires头来判断消息已经过期并拒绝该消息。

tl;博士:expiration != expires

如何在 Celery 中传递消息属性

这种方法在 Celery 中没有记录。我通过反复试验找到了答案,因为我自己想要一个原生 TTL。

例如由 调用的send_task方法 ( )接受参数。然后将 Celery 未知的所有内容作为消息属性传递到方法中。celery.app.base.Celery.send_taskapply_async**options**optionscelery.app.amqp.Queues->send_task_message( ... )**kwargs

所以如果我们可以设置消息属性,没有什么比设置原生过期更简单的了:

my_awesome_task.apply_async(args=(11,), expiration=42)

RabbitMQ 管理控制台

  • 请注意,Celery 会自动将 42 秒转换为 42000 毫秒(这是正确的)。
  • 过期(在属性中)和过期(在标题中)可以组合在一起,这两个功能不会受到任何影响。
于 2022-01-25T12:44:33.047 回答