我有以下设置:
- Django-Celery项目A注册任务
foo
- 项目 B:使用 Celery 的 send_task 调用
foo
- 项目 A 和项目 B 的配置相同:SQS、用于序列化的 msgpack、gzip 等。
- 每个项目都存在于不同的 github 存储库中
我在项目 A 中对“foo”的调用进行了单元测试,根本没有使用 Celery,只是foo(1,2,3)
断言结果。我知道它有效。
我已经对项目 B 中的 send_task 发送正确的参数进行了单元测试。
我没有测试,需要你建议的是两个项目之间的集成。我想要一个单元测试,它将:
- 在项目 A 的上下文中启动一个工人
- 使用项目B的代码发送任务
- 断言在第一步中启动的工作人员使用我在第二步中发送的参数获取了任务,并且该
foo
函数返回了预期的结果。
似乎可以通过使用python的子进程并解析worker的输出来破解它,但这很丑陋。在这种情况下,推荐的单元测试方法是什么?您可以分享任何代码片段吗?谢谢!