我有一个具有以下层次结构的 django 项目:
.
├── api_v_1_0
├── celery_tasks
├── db.sqlite3
├── doc
├── manage.py
├── __pycache__
├── rockynode
├── run.sh
├── static
├── staticfiles
└── web
所有 celery 任务都在celery_tasks
应用程序中:
celery_tasks
├── apps.py
├── emitter
├── __init__.py
├── mongo
├── __pycache__
├── rockynodemail
├── test
模块是新添加的test
模块,任务如下:
from rockynode.celerys import celery_app_basic
class test:
@celery_app_basic.task(queue='General')
def s(x):
return "{}"
测试模块主要包含两个文件:
celery_tasks/test
├── __init__.py
└── mytest.py
的内容__init__.py
如下:
from .mytest import test
但是我在芹菜中看不到新创建的任务。例如,如果我执行以下操作,我看不到celery_tasks.test.s
列表中的任务:
from celery.task.control import inspect
i = inspect()
i.registered_tasks()
我在这个过程中遗漏了什么吗?