我将 django 与 celery 和 redis 一起使用来处理异步任务。我定义了三个任务,它们应该在您自己的队列中运行。
我的项目结构如下所示:
django-project
|- api
|- task.py
|- view.py
|- django-project
|- settings.py
|- celery.py
|- __init__.py
我在我的 api 应用程序的 task.py 中定义的任务:
@shared_task
def manually_task(website_id):
print("manually_task");
website = Website.objects.get(pk=website_id)
x = Proxy(website, "49152")
x.startproxy()
x = None
@periodic_task(run_every=(crontab(hour=19, minute=15)), ignore_result=True)
def periodically_task():
websites = Website.objects.all()
for website in websites:
x = Proxy(website, "49153")
x.startproxy()
x = None
@shared_task
def firsttime_task(website_id):
website = Website.objects.get(pk=website_id)
x = Proxy(website, "49154")
x.startproxy()
x = None
现在这是我的初始化.py
__all__ = ('celery_app',)
和settings.py中的芹菜设置:
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_DEFAULT_QUEUE = 'red'
CELERY_TASK_QUEUES = (
Queue('red', Exchange('red'), routing_key='red'),
)
CELERY_ROUTES = {
'api.tasks.manually_task': {'queue': 'red'},
}
我的 celery.py 看起来像这样:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django-project.settings')
app = Celery('django-project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
这是我的设置。现在我开始所有需要的东西(自己终端中的每一行):
redis-server
celery -A django-project worker -Q red
python3 manage.py runserver 0.0.0.0:8000
一切开始都没有问题。在视图中,我这样称呼任务:
manually_task.delay(webseite.pk)
但在工人无所事事。CELERY_TASK_QUEUES
如果我在没有,CELERY_DEFAULT_QUEUE
和CELERY_ROUTES
设置的情况下尝试此操作settings.py
并正常启动工作程序,celery -A django-project worker
则它可以正常工作。我做错了什么?