我正在尝试使用 python 模拟库来修补在我的 django 应用程序中保存模型时运行的 Celery 任务,以查看它是否被正确调用。
基本上,任务是在内部定义的myapp.tasks
,并在我的 models.py 文件的顶部导入,如下所示:
from .tasks import mytask
...然后save()
使用mytask.delay(foo, bar)
. 到目前为止一切顺利 - 当我实际运行 Celeryd 等时效果很好。
我想构建一个模拟任务的单元测试,只是为了检查它是否被正确的参数调用,并且实际上并没有尝试运行 Celery 任务。
因此,在测试文件中,我在标准 TestCase 中有这样的内容:
from mock import patch # at the top of the file
# ...then later
def test_celery_task(self):
with patch('myapp.models.mytask.delay') as mock_task:
# ...create an instance of the model and save it etc
self.assertTrue(mock_task.called)
...但它永远不会被调用/总是错误的。我尝试了各种化身(myapp.models.mytask
而不是打补丁,并检查是否mock_task.delay
被调用。我从模拟文档中收集到导入路径至关重要,谷歌搜索告诉我它应该是在模块内部看到的路径测试(如果我理解正确,那就是myapp.models.mytask.delay
而不是)。myapp.tasks.mytask.delay
我在哪里错了?修补 Celery 任务是否有一些特定的困难?我可以修补celery.task
(用作装饰器mytask
)吗?