9

celery中,我想获取特定任务名称的所有任务的任务状态。对于尝试下面的代码。

import celery.events.state

# Celery status instance.
stat = celery.events.state.State()

# task_by_type will return list of tasks.
query = stat.tasks_by_type("my_task_name")

# Print tasks.
print query

现在我在这段代码中得到了空列表。

4

3 回答 3

2

celery.events.state.State()是一种用于跟踪 celery 工人和任务状态的数据结构。调用 时State(),您会得到一个没有数据的空状态对象。

您应该使用app.events.Receiver(流处理)或celery.events.snapshot(批处理)来捕获包含任务的状态。

示例代码:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)
于 2018-11-23T13:14:08.597 回答
1

这不是本机支持的。根据后端(Mongo、Redis 等)的不同,您可能能够也可能不能内省队列的内容并找出其中的内容。即使这样做,您也会错过当前正在进行的项目。

也就是说,您可以自己管理:

result = mytask.delay(...)
my_datastore.save("mytask", result.id)
...
for id in my_datastore.find(task="mytask"):
    res = AsyncResult(id)
    print res.state
于 2013-10-01T06:06:37.637 回答
0

在 celery 中,如果您想从其他功能访问它们,您可以通过任务 ID 访问它们来轻松找到任务的状态。

示例代码:-

@task(name='Sum_of_digits')
def ABC(x,y):
   return x+y

添加此任务进行处理

 res = ABC.delay(1, 2)

现在使用任务res获取状态、状态和结果(res.get())

 print(f"id={res.id}, state={res.state}, status={res.status}")
于 2019-06-18T12:03:22.203 回答