如何在不知道task_id
每个任务的情况下删除所有待处理任务?
10 回答
对于芹菜 3.0+:
$ celery purge
要清除特定队列:
$ celery -Q queue_name purge
对于 Celery 2.x 和 3.x:
例如,当使用带有 -Q 参数的 worker 来定义队列时
celery worker -Q queue1,queue2,queue3
然后celery purge
将不起作用,因为您无法将队列参数传递给它。它只会删除默认队列。解决方案是使用如下参数启动您的工人--purge
:
celery worker -Q queue1,queue2,queue3 --purge
然而,这将运行工人。
其他选项是使用 celery 的 amqp 子命令
celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
在芹菜 3+ 中:
命令行:
$ celery -A proj purge
以编程方式:
>>> from proj.celery import app
>>> app.control.purge()
http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks
我发现这celery purge
不适用于我更复杂的 celery 配置。我将多个命名队列用于不同的目的:
$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ... # Output sorted, whitespaced for readability
celery 0 2
celery@web01.celery.pidbox 0 1
celery@web02.celery.pidbox 0 1
apns 0 1
apns@web01.celery.pidbox 0 1
analytics 1 1
analytics@web01.celery.pidbox 0 1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0 0 1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1 0 1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54 0 1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866 0 1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99 0 1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e 0 1
第一列是队列名称,第二列是队列中等待的消息数,第三列是该队列的侦听器数。队列是:
- celery - 标准、幂等 celery 任务的队列
- apns - Apple 推送通知服务任务的队列,不完全是幂等的
- 分析 - 为长时间运行的夜间分析排队
- *.pidbox - 工作命令队列,例如关闭和重置,每个工作人员一个(2 个 celery 工作人员,一个 apns 工作人员,一个分析工作人员)
- bcast.* - 广播队列,用于向所有监听队列的工作人员发送消息(而不仅仅是第一个获取它的工作人员)
- celeryev.* - Celery 事件队列,用于报告任务分析
分析任务是一项蛮力任务,在小型数据集上效果很好,但现在需要超过 24 小时才能处理。有时,会出现问题,它会卡在等待数据库上。它需要重新编写,但在那之前,当它卡住时,我会终止任务,清空队列,然后重试。我通过查看分析队列的消息计数来检测“卡顿”,该计数应该是 0(已完成分析)或 1(等待昨晚的分析完成)。2 或更高是不好的,我会收到一封电子邮件。
celery purge
提供从一个广播队列中删除任务,我没有看到选择不同命名队列的选项。
这是我的过程:
$ sudo /etc/init.d/celeryd stop # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
如果您想删除所有待处理的任务以及活动和保留的任务以完全停止 Celery,这对我有用:
from proj.celery import app
from celery.task.control import inspect, revoke
# remove pending tasks
app.control.purge()
# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
tasks = jobs[hostname]
for task in tasks:
revoke(task['id'], terminate=True)
# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
tasks = jobs[hostname]
for task in tasks:
revoke(task['id'], terminate=True)
在芹菜 3+
http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks
命令行界面
清除命名队列:
celery -A proj amqp queue.purge <queue name>
清除配置的队列
celery -A proj purge
我已清除消息,但队列中仍有消息?答:任务在实际执行后立即得到确认(从队列中删除)。Worker 收到任务后,需要一段时间才能真正执行,特别是如果有很多任务已经在等待执行。未确认的消息由工作程序保留,直到它关闭与代理(AMQP 服务器)的连接。当该连接关闭时(例如,因为工作人员已停止),代理将重新发送任务到下一个可用工作人员(或重新启动时的同一个工作人员),因此要正确清除等待任务的队列,您必须停止所有工作人员,然后使用 celery.control.purge() 清除任务。
因此,要清除整个队列,必须停止工作人员。
celery 4+ celery purge 命令清除所有配置的任务队列
celery -A *APPNAME* purge
以编程方式:
from proj.celery import app
app.control.purge()
所有待处理的任务将被清除。参考:celerydoc
对于以 RabbitMQ 作为代理的 Celery 版本 5.0+
我们需要先建立从程序到代理的新连接,并将连接与队列绑定以进行清除。
# proj/celery.py
from celery import Celery
app = Celery('proj')
from proj.celery import app
queues = ['queue_A', 'queue_B', 'queue_C']
with app.connection_for_write() as conn:
conn.connect()
for queue in queues:
count = app.amqp.queues[queue].bind(conn).purge()
print(f'Purge {queue} with {count} message(s)')
1.要正确清除等待任务的队列,您必须停止所有工作人员(http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-仍然消息留在队列中):
$ sudo rabbitmqctl stop
或者(如果 RabbitMQ/消息代理由 Supervisor 管理):
$ sudo supervisorctl stop all
2. ...然后从特定队列中清除任务:
$ cd <source_dir>
$ celery amqp queue.purge <queue name>
3.启动RabbitMQ:
$ sudo rabbitmqctl start
或者(如果 RabbitMQ 由 Supervisor 管理):
$ sudo supervisorctl start all