我有一个使用 djcelery 的定期任务。此任务正在调查数据库,如有必要,它会进行相同的更改。如您所知,“python manage.py test”创建了一个测试数据库,那么我如何在这个测试数据库中运行这个周期性任务呢?
我尝试先运行:python manage.py test
然后当我尝试运行以下
python manage.py celery worker -l info &
python manage.py celery beat -l info &
测试终止
我有一个使用 djcelery 的定期任务。此任务正在调查数据库,如有必要,它会进行相同的更改。如您所知,“python manage.py test”创建了一个测试数据库,那么我如何在这个测试数据库中运行这个周期性任务呢?
我尝试先运行:python manage.py test
然后当我尝试运行以下
python manage.py celery worker -l info &
python manage.py celery beat -l info &
测试终止
你不需要运行 celery workers 或 celery beat,那不是很好的单元测试。您正在尝试测试该任务,因此您应该隔离该任务并对其进行测试。您需要使用 CELERY_ALWAYS_EAGER 模式运行测试,或者只是从测试内部运行您的任务而不使用异步模式。
为了避免使用异步,只需像这样调用你的任务:
@celery.task
def task_to_test():
print('stuff')
def test:
task_to_test()
如果你不能这样做。覆盖您的设置以使用总是渴望。
from tasks import task_to_test
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True,
BROKER_MEMORY='memory',
CACHES = {'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',}})
class MyTests(TestCase):
def test():
task_to_test.delay()