3

当其中一个孩子中止和/或抛出异常时,中止多处理的正确方法是什么?

我发现了各种各样的问题(通用多处理错误处理如何在异常但没有答案时关闭多处理池,...),但没有关于如何停止对子异常进行多处理的明确答案。

例如,我期望以下代码:

def f(x):
    sleep(x)
    print(f"f({x})")
    return 1.0 / (x - 2)


def main():
    with Pool(4) as p:
        try:
            r = p.map(f, range(7))
        except Exception as e:
            print(f"oops: {e}")
            p.close()
            p.terminate()
    print("end")


if __name__ == '__main__':
    main()

输出:

f(0)
f(1)
f(2)
oops: float division by zero
end

相反,它f在检测/处理异常之前对所有项目应用函数:

f(0)
f(1)
f(2)
f(4)
f(3)
f(5)
f(6)
oops: float division by zero
end

没有办法直接捕获异常吗?

4

1 回答 1

2

我认为您将需apply_async要这样做,因此您可以对每一个结果而不是累积结果采取行动。pool.apply_async提供error_callback可用于注册错误处理程序的参数。apply_async没有阻塞,所以你需要去join()游泳池。我还使用一个标志terminated来知道在没有发生异常的情况下何时可以正常处理结果。

from time import sleep
from multiprocessing import Pool

def f(x):
    sleep(x)
    print(f"f({x})")
    return 1.0 / (x - 2)

def on_error(e):
    global terminated
    terminated = True
    pool.terminate()
    print(f"oops:{e}")


def main():
    global pool
    global terminated

    terminated = False

    pool = Pool(4)
    results = [pool.apply_async(f, (x,), error_callback=on_error)
               for x in range(7)]
    pool.close()
    pool.join()

    if not terminated:
        for r in results:
            print(r.get())

    print("end")


if __name__ == '__main__':
    main()
于 2018-09-11T23:01:35.433 回答