可以将消息发布到具有过期 TTL 的 RabbitMQ 队列中:一旦 TTL 完成并且(如果设置了死信队列),此类消息将过期并删除到死信队列。
但是是否可以使用 Celery 指定这样的每条消息 TTL?
请注意,我不是在寻找一种方法来指定任务到期,而是在寻找消息到期:我希望我的消息在最终被@死信队列接收之前在队列中花费(可配置的)时间量。
TIA。
可以将消息发布到具有过期 TTL 的 RabbitMQ 队列中:一旦 TTL 完成并且(如果设置了死信队列),此类消息将过期并删除到死信队列。
但是是否可以使用 Celery 指定这样的每条消息 TTL?
请注意,我不是在寻找一种方法来指定任务到期,而是在寻找消息到期:我希望我的消息在最终被@死信队列接收之前在队列中花费(可配置的)时间量。
TIA。
RabbitMQ 确实支持每个消息的 TTL(以及队列的 TTL),行为记录在这里:https ://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers 。诀窍是在消息发布时(以毫秒为单位)设置expiration
消息属性(https://www.rabbitmq.com/publishers.html#message-properties)。
另一方面,Celery 允许您以秒或日期时间的形式设置expires
参数(https://docs.celeryproject.org/en/stable/reference/celery.app.task.html)。与原生 RabbitMQ 功能的不同之处在于,消息在过期后仍保留在队列中。过期的消息被传递给worker,worker然后读取expires头来判断消息已经过期并拒绝该消息。
tl;博士:expiration != expires
这种方法在 Celery 中没有记录。我通过反复试验找到了答案,因为我自己想要一个原生 TTL。
例如由 调用的send_task
方法 ( )接受参数。然后将 Celery 未知的所有内容作为消息属性传递到方法中。celery.app.base.Celery.send_task
apply_async
**options
**options
celery.app.amqp.Queues->send_task_message( ... )
**kwargs
所以如果我们可以设置消息属性,没有什么比设置原生过期更简单的了:
my_awesome_task.apply_async(args=(11,), expiration=42)