0

我有一个具有以下层次结构的 django 项目:

.
├── api_v_1_0
├── celery_tasks
├── db.sqlite3
├── doc
├── manage.py
├── __pycache__
├── rockynode
├── run.sh
├── static
├── staticfiles
└── web

所有 celery 任务都在celery_tasks应用程序中:

celery_tasks
├── apps.py
├── emitter
├── __init__.py
├── mongo
├── __pycache__
├── rockynodemail
├── test

模块是新添加的test模块,任务如下:

from rockynode.celerys import celery_app_basic
class test:

    @celery_app_basic.task(queue='General')
    def s(x):
        return "{}"

测试模块主要包含两个文件:

celery_tasks/test
├── __init__.py
└── mytest.py

的内容__init__.py如下:

from .mytest import test

但是我在芹菜中看不到新创建的任务。例如,如果我执行以下操作,我看不到celery_tasks.test.s列表中的任务:

from celery.task.control import  inspect
i = inspect()
i.registered_tasks()

我在这个过程中遗漏了什么吗?

4

0 回答 0