0

在 Celery 2 中,我有一个 TaskSet,其设置如下:

for (item,jobId) in itemsAndJobs:
    tasks.append(waitForOutput.subtask((jobId,item)))
job = TaskSet(tasks)
result = job.apply_async()

然后稍后,我将通过以下检查来检查 TaskSet 是否已完成:

job.ready() and job.successful()

这工作得很好——waitForOutput 任务会慢慢移动,然后当它们都完成后,作业检查就会完成。我可以多次检查没有任何问题。

在 celery 3 中,我尝试了将其更改为组的快速而肮脏的方法,只需更改

TaskSet(tasks)

group(tasks)

除非我等到所有 waitForOutput 任务完成后才检查就绪并成功,否则这永远不会奏效。ready() 总是返回 false。我添加了一些日志记录和 30 秒的默认重试,这就是我所看到的 -

  • 启动 5 个 waitForOutput 作业
  • 检查 ready(),没有 waitForOutput 作业完成,ready 为 false
  • 2 个 waitForOutput 作业完成
  • 检查 ready(),ready 为 false,job.check_completed() 为 2
  • 检查 ready(),ready 为 false,job.check_completed() 为 0
  • 剩余 3 个 waitForOutput 作业完成
  • 检查 ready(),ready 为 false,job.check_completed() 为 3

如果我使用 Celery 3 代码并从任务中导入 TaskSet 并使用它而不是组,我会看到相同的行为。

我很想被告知我只是错误地使用了组!

4

1 回答 1

0

我通过迁移到 Redis 作为我的结果后端解决了这个问题。使用 AMQP 作为结果后端似乎是一个错误。

于 2012-08-23T15:20:11.993 回答