在 Celery 2 中,我有一个 TaskSet,其设置如下:
for (item,jobId) in itemsAndJobs:
tasks.append(waitForOutput.subtask((jobId,item)))
job = TaskSet(tasks)
result = job.apply_async()
然后稍后,我将通过以下检查来检查 TaskSet 是否已完成:
job.ready() and job.successful()
这工作得很好——waitForOutput 任务会慢慢移动,然后当它们都完成后,作业检查就会完成。我可以多次检查没有任何问题。
在 celery 3 中,我尝试了将其更改为组的快速而肮脏的方法,只需更改
TaskSet(tasks)
至
group(tasks)
除非我等到所有 waitForOutput 任务完成后才检查就绪并成功,否则这永远不会奏效。ready() 总是返回 false。我添加了一些日志记录和 30 秒的默认重试,这就是我所看到的 -
- 启动 5 个 waitForOutput 作业
- 检查 ready(),没有 waitForOutput 作业完成,ready 为 false
- 2 个 waitForOutput 作业完成
- 检查 ready(),ready 为 false,job.check_completed() 为 2
- 检查 ready(),ready 为 false,job.check_completed() 为 0
- 剩余 3 个 waitForOutput 作业完成
- 检查 ready(),ready 为 false,job.check_completed() 为 3
如果我使用 Celery 3 代码并从任务中导入 TaskSet 并使用它而不是组,我会看到相同的行为。
我很想被告知我只是错误地使用了组!