37

我使用 Celery (3.0.15) 和 Redis 作为代理。

是否有一种直接的方法来查询 Celery 队列中存在的具有给定名称的任务数?

而且,作为后续,有没有办法取消 Celery 队列中存在的所有具有给定名称的任务?

我已经阅读了监控和管理指南,但没有找到解决方案。

4

5 回答 5

40
# Retrieve tasks
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
query = celery.events.state.tasks_by_type(your_task_name)

# Kill tasks
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
for uuid, task in query:
    celery.control.revoke(uuid, terminate=True)
于 2013-03-26T16:18:03.667 回答
17

有一个问题是较早的答案没有解决的,如果人们不知道的话,可能会让他们失望。

在已经发布的那些解决方案中,我会使用Danielle 的一个小修改:我会将任务导入我的文件并使用它的.name属性来获取任务名称以传递给.tasks_by_type().

app.control.revoke(
    [uuid for uuid, _ in
     celery.events.state.State().tasks_by_type(task.name)])

但是,此解决方案将忽略那些已安排为将来执行的任务。就像一些评论其他答案的人一样,当我检查.tasks_by_type()返回的内容时,我有一个空列表。事实上,我的队列是空的。但我知道有计划在未来执行的任务,这些是我的主要目标。我可以通过执行看到它们,celery -A [app] inspect scheduled但它们不受上面代码的影响。

我设法通过这样做撤销了计划任务:

app.control.revoke(
    [scheduled["request"]["id"] for scheduled in
     chain.from_iterable(app.control.inspect().scheduled()
                         .itervalues())])

app.control.inspect().scheduled()返回一个字典,其键是工作人员名称,值是调度信息列表(因此,需要chain.from_iterable从 导入itertools)。任务信息在"request"调度信息字段中,"id"包含任务id。请注意,即使撤销后,计划任务仍会显示在计划任务中。被撤销的计划任务不会从计划任务列表中删除,直到它们的计时器到期或直到 Celery 执行一些清理操作。(重新启动工人会触发此类清理。)

于 2015-09-03T18:39:30.517 回答
4

您可以在一个请求中执行此操作:

app.control.revoke([
    uuid
    for uuid, _ in
    celery.events.state.State().tasks_by_type(task_name)
])
于 2015-04-22T04:57:36.267 回答
3

和 Celery 一样,这里没有一个答案对我有用,所以我做了我平常事情,并拼凑了一个直接检查 redis 的解决方案。开始了...

# First, get a list of tasks from redis:
import redis, json

r = redis.Redis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)

# Now import the task you want so you can get its name
from my_django.tasks import my_task

# Now, import your celery app and iterate over all tasks 
# from redis and nuke the ones that have a matching name.
from my_django.celery_init import app
for task in l:
     task_headers = json.loads(task)['headers']
     task_name = task_headers["task"]
     if task_name == my_task.name:
         task_id = task_headers['id']
         print("Terminating: %s" % task_id)
         app.control.revoke(task_id, terminate=True)

请注意,以这种方式撤销可能不会撤销预取的任务,因此您可能不会立即看到结果。

此外,此答案不支持优先任务。如果您想修改它来做到这一点,您将需要我的其他答案中的一些技巧,即破解 redis

于 2021-01-09T18:05:00.503 回答
2

它看起来像flower提供监控:

https://github.com/mher/flower

使用 Celery Events 进行实时监控

任务进度和历史 能够显示任务详细信息(参数、开始时间、运行时间等) 图形和统计数据 远程控制

查看工作器状态和统计信息 关闭和重新启动工作器实例 控制工作器池大小和自动缩放设置 查看和修改工作器实例使用的队列 查看当前正在运行的任务 查看计划任务(ETA/倒计时) 查看保留和撤销的任务 应用时间和速率限制 配置查看器 撤销或终止任务 HTTP API

OpenID 身份验证

于 2013-03-26T16:20:37.743 回答