我正在使用芹菜和 django-celery。我已经定义了一个我想测试的定期任务。是否可以手动从 shell 运行定期任务以便查看控制台输出?
问问题
56863 次
3 回答
121
您是否尝试过从 Django shell 运行任务?您可以使用.apply
任务的方法来确保它在本地急切地运行。
假设任务my_task
在 Django 应用程序myapp
中的tasks
子模块中调用:
$ python manage.py shell
>>> from myapp.tasks import my_task
>>> eager_result = my_task.apply()
结果实例具有与通常类型相同的 API AsyncResult
,只是结果总是在本地急切地评估,并且该.apply()
方法将阻塞,直到任务运行完成。
于 2012-10-15T16:40:28.987 回答
28
如果您的意思是在不满足条件时触发任务,例如不满足周期时间。您可以分两步完成。
1.获取您的任务ID。
您可以通过键入来完成。
celery inspect registered
你会看到类似app.tasks.update_something
. 如果没有,那可能celery
是没有开始。运行它。
2.运行任务celery call
celery call app.tasks.update_something
有关更多详细信息,只需键入
celery --help
celery inspect --help
celery call --help
于 2018-10-20T04:49:14.913 回答
8
我认为您需要打开两个 shell:一个用于从 Python/Django shell 执行任务,另一个用于运行celery worker
( python manage.py celery worker
)。正如前面的答案所说,您可以使用apply()
或运行任务apply_async()
我已经编辑了答案,因此您没有使用已弃用的命令。
于 2012-10-15T16:42:41.657 回答