14

I'm writing an application which needs to run a series of tasks in parallel and then a single task with the results of all the tasks run:

@celery.task
def power(value, expo):
    return value ** expo

@celery.task
def amass(values):
    print str(values)

It's a very contrived and oversimplified example, but hopefully the point comes across well. Basically, I have many items which need to run through power, but I only want to run amass on the results from all of the tasks. All of this should happen asynchronously, and I don't need anything back from the amass method.

Does anyone know how to set this up in celery so that everything is executed asynchronously and a single callback with a list of the results is called after all is said and done?

I've setup this example to run with a chord as Alexander Afanasiev recommended:

from time import sleep

import random

tasks = []

for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)

Unfortunately, in the above example, all tasks in tasks are started only when the chord method is called. Is there a way that each task can start separately and then I could add a callback to the group to run when everything has finished?

4

4 回答 4

8

Here's a solution which worked for my purposes:

tasks.py:

from time import sleep

import random

@celery.task
def power(value, expo):
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
    return value ** expo

@celery.task
def amass(results, tasks):
    completed_tasks = []
    for task in tasks:
        if task.ready():
            completed_tasks.append(task)
            results.append(task.get())

    # remove completed tasks
    tasks = list(set(tasks) - set(completed_tasks))

    if len(tasks) > 0:
        # resend the task to execute at least 1 second from now
        amass.delay(results, tasks, countdown=1)
    else:
        # we done
        print results

Use Case:

tasks = []

for i in xrange(10):
    tasks.append(power.delay(i, 2))

amass.delay([], tasks)

What this should do is start all of the tasks as soon as possible asynchronously. Once they've all been posted to the queue, the amass task will also be posted to the queue. The amass task will keep reposting itself until all of the other tasks have been completed.

于 2013-05-03T00:07:05.677 回答
5

Celery为您可以想象的大多数工作流程提供了大量工具。

看来您需要使用chord。这是来自文档的引用:

和弦就像一个组,但有一个回调。一个和弦由一个标题组和一个正文组成,其中正文是一个任务,应该在标题中的所有任务完成后执行。

于 2013-05-01T04:03:10.207 回答
3

从您的问题中查看此片段,您似乎将 alist作为和弦标题传递,而不是 a group

from time import sleep
import random

tasks = []

for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)

将 转换list为 agroup应该会导致您期望的行为:

...

callback = amass.s()

tasks = group(tasks)

r = chord(tasks)(callback)
于 2015-07-10T00:38:08.787 回答
0

@alexander-afanasiev 给你的答案基本上是正确的:使用和弦。

您的代码没问题,但tasks.append(power.s((i, 2)))实际上并没有执行子任务,只是将子任务添加 到列表中。它chord(...)(...)向代理发送与您在tasks列表中定义的子任务一样多的消息,再加上一条用于回调子任务的消息。当您调用chord它时,它会尽快返回。

如果您想知道和弦何时完成,您可以像r.ready()在示例中使用单个任务一样轮询完成。

于 2013-05-02T09:23:49.410 回答