4

我正在使用 Celery 运行数千个任务组,其中每个任务都需要几分钟才能运行。下面的代码是我的简单替换multiprocessing.pool.Pool.map

def map(task, data):
    """
    Perform the *task* on *data* in distributed way. Blocks until finished.
    """
    ret = celery_module.group(task.s(val) for val in data).apply_async()
    return ret.get(interval = 0.1)

只要工人从不休息,这就像一种魅力。但有时会发生一个节点死亡,并带走一些正在运行的任务。然后发生的事情是所有其他任务都完成了,工人变得空闲,但get永远等待死亡工人的结果。

如何让死任务在超时后重试?这些任务是幂等的,我完全不担心重复执行。我试过CELERY_ACKS_LATE在这里和那里玩弄和设置超时,但似乎没有任何东西可以纠正这种情况。我觉得我错过了一些明显的东西,但找不到什么。

编辑:用于代理和结果的传输是 Redis。

4

1 回答 1

0

这里的正确行为是设置超时,当超时重试整个map任务。

于 2012-11-22T08:03:25.513 回答