2

也许我对芹菜有误解,但我已经坚持了很长一段时间。

我有一堆简单的子任务,我想并行运行,我想在它们完成时对其进行迭代,而不是等待它们全部完成。我试过这个:

def task_generator():
   for row in db:
      yield mytask.s(row)

from celery.result import ResultSet
r = ResultSet(t.delay() for t in task_generator())
for result in r.iterate():
    print result

然而 celery 首先运行所有任务,并且迭代仅在所有任务完成后才开始,尽管有ResultSet.iterate阅读文档"Iterate over the return values of the tasks as they finish one by one."

那么如何在任务结果完成时对其进行迭代呢?

4

1 回答 1

0

我已经实现了这一点,并在过去使用过它。我认为.iterate()它已被弃用,所以我自己正在研究一个新的解决方案。

@task
def task_one(foo, bar):
    return foo + bar

subtasks = []
for x in range(10):
    subtasks.append(task_one.s(x, x+1))

results = celery.Group(subtasks)()  # call this
for result in results.iterate(propagate=False):
    answer = result.iteritems().next()
于 2014-02-19T01:11:15.120 回答