我有一个用 Django 编写的 REST API,其端点在发布到芹菜任务时对其进行排队。响应包含我想用来测试任务是否已创建并获得结果的任务 ID。所以,我想做类似的事情:
def test_async_job():
response = self.client.post("/api/jobs/", some_test_data, format="json")
task_id = response.data['task_id']
result = my_task.AsyncResult(task_id).get()
self.assertEquals(result, ...)
我显然不想让 celery worker 运行单元测试,我希望以某种方式模拟它。我不能使用CELERY_ALWAYS_EAGER因为这似乎完全绕过了代理,阻止我使用 AsyncResult 通过其 ID 获取任务(如此处所述)。
通过 celery 和kombu docs,我发现有一个用于单元测试的内存传输,可以满足我的需求。我尝试覆盖BROKER_URL
设置以在测试中使用它:
@override_settings(BROKER_URL='memory://')
def test_async_job():
但行为与 ampq 代理相同:它阻止测试等待结果。任何想法我应该如何配置这个代理让它在测试中工作?