32

我想从 Celery 任务返回的列表中创建一个组,这样对于任务结果集中的每个项目,都会将一个任务添加到组中。

这是一个解释用例的简单代码示例。???应该是上一个任务的结果。

@celery.task
def get_list(amount):
    # In reality, fetch a list of items from a db
    return [i for i in range(amount)]

@celery.task
def process_item(item):
    #do stuff
    pass

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))

我可能没有正确处理这个问题,但我很确定从任务中调用任务是不安全的:

@celery.task
def process_list():
    for i in get_list.delay().get():
        process_item.delay(i)

我不需要秒任务的结果。

4

1 回答 1

46

您可以使用中间任务获得这种行为。这是创建类似于您建议的“地图”方法的演示。

from celery import task, subtask, group

@task
def get_list(amount):
    return [i for i in range(amount)]

@task
def process_item(item):
    # do stuff
    pass

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s()))

感谢 Ask Solem 在我就类似问题向他寻求帮助时给了我这个建议。

于 2012-11-26T17:20:55.330 回答