0

我有一个复杂的场景需要解决。

我正在使用 Celery 并行运行任务,我的任务涉及 HTTP 请求,我计划将 Celery 与 eventlet 一起用于此目的。

让我解释一下我的场景:

我有 2 个可以并行运行的任务和需要处理这 2 个任务的输出的第三个任务,因此我使用 Celery 组来运行 2 个任务和 Celery 链将输出传递给第三个任务来处理它当他们完成时。

现在它变得复杂了,第三个任务需要产生多个我想并行运行的任务,我想将所有输出收集在一起并在另一个任务中处理它。

所以我为多个任务创建了一个组,并用一个链来处理所有信息。

我想我缺少有关 Celery 并发原语的基本信息,我有一个 1 celery 任务运行良好,但我需要让它更快。

这是代码的简化示例:

@app.task
def task2():
       return "aaaa"

@app.task
def task3():
       return "bbbb"

@app.task
def task4():
    work = group(...) | task5.s(...)                                                          
    work()

@app.task
def task1():   
        tasks = [task2.s(a, b), task3.s(c, d)]
    work = group(tasks) | task4.s()
    return work()

这就是我开始此操作的方式:

    task = tasks1.apply_async(kwargs=kwargs, queue='queue1')

我保存 task.id 并每 30 秒拉一次服务器以查看结果是否可用:

results = tasks1.AsyncResult(task_id)
if results.ready():
     res = results.get()
4

0 回答 0