3

我正在使用 django-nose 为 celery 任务编写单元测试。这是相当典型的;一个空白测试数据库(REUSE_DB=0),在测试时通过夹具预填充。

我遇到的问题是,即使 TestCase 正在加载夹具并且我可以从测试方法访问对象,但在异步 celery 任务中执行相同的查询时会失败。

我检查了 settings.DATABASES["default"]["name"] 在测试方法和被测任务中是否相同。我还验证了被测任务在作为常规方法调用调用时行为正确。

这就是我没有想法的地方。

这是一个示例:

class MyTest(TestCase):
    fixtures = ['test_data.json']

    def setUp(self):
        settings.CELERY_ALWAYS_EAGER = True # seems to be required; if not I get socket errors for Rabbit
        settings.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True # exposes errors in the code under test.

    def test_city(self):
        self.assertIsNotNone(City.objects.get(name='brisbane'))
        myTask.delay(city_name='brisbane').get()
        # The following works fine: myTask('brisbane')

from celery.task import task

@task()
def myTask(city_name):
    c = City.objects.count() # gives 0
    my_city = City.objects.get(name=city_name) # raises DoesNotExist exception
    return
4

1 回答 1

2

这听起来很像 2.5.2 中修复的 django-celery 2.5 中的错误:https ://github.com/celery/django-celery/pull/116

该错误的简要描述是 django-celery 加载程序在执行任务甚至急切任务之前关闭了数据库连接。由于测试在事务中运行,任务执行的新连接无法看到在setUp.

于 2012-06-08T20:20:48.960 回答