由于复杂的回调/链接设置,我们基础设施中的一些任务似乎要进入未分配的队列。所以我想编写自动化测试以确保将芹菜任务发送到指定要处理的队列。
设置示例:
from celery import Celery
celery = Celery()
@celery.task(base=MyTask, queue='mytasks.add')
def add(x, y):
a = x + y
return a
@celery.task(base=MyTask, queue='mytasks.dadd')
def double_add(a, y):
b = a + y
def caller(x, y):
add.apply_async(args=(2, 1), kwargs={''callback': double_add.subtask(args=(3)) })
所以这里的“add”应该由queue='mytasks.add'来处理,而“double_add”应该由queue='mytasks.dadd'来处理
我了解 celery 基于结果的基本测试,如下所示:How do you unit test a Celery task?
但我希望能深入了解上述场景的测试过程。