3

假设我有一个非常简单的任务,如下所示:

@celery.task(ignore_result=True)
def print_page(page):
    with open('path/to/page','w') as f:
        f.write(page)

(请忽略上述代码中潜在的竞争条件......这是一个简化的例子)

我的问题是以下两个代码示例是否会产生相同的结果,或者一个是否比另一个更好:

选项A:

@celery.task(ignore_result=True)
def print_pages(page_generator):
    for page in page_generator:
        print_page.s(page).apply_async()

选项 B:

@celery.task(ignore_result=True)
def print_pages(page_generator):
    g = group(print_page.s(page) for page in page_generator)
    g.apply_async()

总的来说,我很好奇以上是否是做我正在做的事情的正确方法。本质上,我还有另一个任务是解析一些数据并返回一个生成器,该生成器将发出文档的所有页面。对于每一页,我想分别输出。

所以,我的链看起来像这样(也简化了):

chain = fetch.s(url) | parse.s() | print_pages.s()
chain()

我认为如果我能以某种方式在该链内和那里的组(在实际任务之外)发出生成器,那将更有意义。但我不确定这是否实用或理想。我真的很感激任何帮助。谢谢!

4

2 回答 2

2

您的第一个选择似乎更好。您不想加入扇出 print_pages 任务的结果(假设 ignore_result=True),因此组会增加不必要的开销/复杂性。只需像选项 A 中那样单独调用任务,就可以了。

此外,我想指出 Python 生成器不会腌制,因此您不能将它们异步传递给 Celery 任务。

于 2013-01-30T23:36:55.530 回答
1

在您的情况下,这两种解决方案都是正确的,没有依赖于页面任务,但是假设您有一个任务分为子任务,并且所有这些子任务都是按顺序排列的,在这种情况下,您应该通过选择 B 对其进行分组

于 2013-01-30T22:00:37.307 回答