118

如何检查一项任务是否在 celery 中运行(具体来说,我使用的是 celery-django)?

我已经阅读了文档,并在谷歌上搜索过,但我看不到这样的电话:

my_example_task.state() == RUNNING

我的用例是我有一个用于转码的外部(java)服务。当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它。

我相信我正在使用当前的稳定版本 - 2.4。

4

13 回答 13

115

返回 task_id(由 .delay() 给出),然后向 celery 实例询问状态:

x = method.delay(1,2)
print x.task_id

询问时,使用此 task_id 获取新的 AsyncResult:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
于 2012-01-27T14:41:36.377 回答
92

AsyncResult从任务 ID创建对象常见问题解答中推荐的获取任务状态的方法,当您唯一拥有的是任务 ID 时。

然而,从 Celery 3.x 开始,如果人们不注意它们,有一些重要的警告可能会咬人。这实际上取决于特定的用例场景。

默认情况下,Celery 不记录“运行”状态。

为了让 Celery 记录任务正在运行,您必须设置task_track_startedTrue. 这是一个测试这个的简单任务:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

什么时候task_track_startedFalse,这是默认的,PENDING即使任务已经开始,状态也会显示。如果设置task_track_startedTrue,则状态将为STARTED

状态的PENDING意思是“我不知道”。

一个AsyncResultwith statePENDING并不意味着 Celery 不知道任务的状态。这可能是由于多种原因。

一方面,AsyncResult可以使用无效的任务 ID 构建。此类“任务”将被 Celery 视为待处理:

>>> task.AsyncResult("invalid").status
'PENDING'

好的,所以没有人会将明显无效的 id 提供给AsyncResult. 很公平,但它也有效果,它AsyncResult也会考虑一个成功运行但 Celery 忘记的任务PENDING同样,在某些用例场景中,这可能是一个问题。部分问题取决于如何配置 Celery 以保留任务结果,因为它取决于结果后端中“墓碑”的可用性。(“墓碑”是 Celery 文档中用于记录任务如何结束的数据块的术语。)如果is ,则使用AsyncResult根本不起作用。一个更令人烦恼的问题是 Celery 默认会过期墓碑。这task_ignore_resultTrueresult_expires默认设置为 24 小时。因此,如果您启动一个任务,并将 id 记录在长期存储中,并在 24 小时后,您AsyncResult使用它创建一个,状态将为PENDING.

所有“真正的任务”都从PENDING状态开始。因此,PENDING完成一项任务可能意味着该任务已被请求但从未比这更进一步(无论出于何种原因)。或者这可能意味着任务运行但 Celery 忘记了它的状态。

哎哟! AsyncResult不会为我工作。我还可以做些什么?

我更喜欢跟踪目标而不是跟踪任务本身。我确实保留了一些任务信息,但这对于跟踪目标来说确实是次要的。目标存储在独立于 Celery 的存储中。当一个请求需要执行计算取决于某个目标是否已经实现时,它会检查该目标是否已经实现,如果是,则使用这个缓存的目标,否则启动将影响该目标的任务,并发送到发出 HTTP 请求的客户端响应指示它应该等待结果。


上面的变量名和超链接适用于 Celery 4.x。在 3.x 中,对应的变量和超链接是:CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

于 2016-07-08T13:30:40.343 回答
69

每个Task对象都有一个.request属性,其中包含它的AsyncRequest对象。因此,以下行给出了 Task 的状态task

task.AsyncResult(task.request.id).state
于 2012-01-28T14:59:57.713 回答
17

您还可以创建自定义状态并在任务执行期间更新其值。这个例子来自文档:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

于 2015-01-03T04:47:54.903 回答
17

老问题,但我最近遇到了这个问题。

如果您尝试获取 task_id,您可以这样做:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

现在您确切知道 task_id 是什么,现在可以使用它来获取 AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
于 2016-07-10T01:35:14.690 回答
11

只需使用芹菜常见问题解答中的此 API

result = app.AsyncResult(task_id)

这工作正常。

于 2018-11-29T06:33:10.690 回答
2

2020年的答案:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
于 2020-03-18T09:20:44.213 回答
0

尝试:

task.AsyncResult(task.request.id).state

这将提供芹菜任务状态。如果 Celery Task 已经处于FAILURE状态,它将抛出异常:

raised unexpected: KeyError('exc_type',)

于 2016-05-07T05:44:33.290 回答
0
  • 首先,在你的celery APP中:</li>

vi my_celery_apps/app1.py

app = Celery(worker_name)
  • 接下来,切换到任务文件,从你的 celery app 模块中导入 app。

vi 任务/task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

于 2019-09-24T07:48:17.410 回答
0

我在

芹菜项目工人指南检查工人

就我而言,我正在检查 Celery 是否正在运行。

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

您可以使用检查来满足您的需求。

于 2017-07-12T22:53:20.900 回答
-1
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
于 2020-07-26T03:38:34.277 回答
-1

对于简单的任务,我们可以使用http://flower.readthedocs.io/en/latest/screenshots.htmlhttp://policystat.github.io/jobtastic/来做监控。

对于复杂的任务,比如说一个处理很多其他模块的任务。我们建议手动记录特定任务单元的进度和消息。

于 2017-03-31T06:18:29.470 回答
-3

除了上述程序化方法之外,还可以很容易地看到使用 Flower Task 的状态。

使用 Celery Events 进行实时监控。Flower 是一个基于 Web 的工具,用于监控和管理 Celery 集群。

  1. 任务进度和历史
  2. 能够显示任务详细信息(参数、开始时间、运行时等)
  3. 图表和统计

官方文档: Flower - Celery 监控工具

安装:

$ pip install flower

用法:

http://localhost:5555

更新:这与版本控制有问题,花(版本= 0.9.7)仅适用于芹菜(版本= 4.4.7),当您安装花时,它将您的芹菜更高版本卸载到4.4.7中,这永远不会适用注册任务

于 2018-06-05T17:13:19.797 回答