11

如果原始计算失败,有没有办法使用简单的池重新发送一条数据进行处理?

import random
from multiprocessing import Pool

def f(x):
   if random.getrandbits(1):
       raise ValueError("Retry this computation")
   return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
4

2 回答 2

18

如果您可以(或不介意)立即重试,请使用包装函数的装饰器:

import random
from multiprocessing import Pool
from functools import wraps

def retry(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        while True:
            try:
                return f(*args, **kwargs)
            except ValueError:
                pass
    return wrapped

@retry
def f(x):
    if random.getrandbits(1):
        raise ValueError("Retry this computation")
    return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
于 2012-07-24T05:54:04.363 回答
10

您可以使用 aQueue将失败反馈到Poolinitiating 中的循环中Process

import multiprocessing as mp
import random

def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    total_items = len(pending)
    successful = []
    failure_tracker = []

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, pending)
    retry_results = []
    while len(successful) < total_items:
        successful.extend([r for r in results if not r is None])
        successful.extend([r for r in retry_results if not r is None])
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        if failed_items:
            failure_tracker.append(failed_items)
            retry_results = p.imap(f, failed_items);
    p.close()
    p.join()

    print "Results: %s" % successful
    print "Failures: %s" % failure_tracker

if __name__ == '__main__':
    main(range(1, 10))

输出是这样的:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]

Pool不能在多个进程之间共享。因此,这种Queue基于方法。如果您尝试将池作为参数传递给池进程,您将收到以下错误:

NotImplementedError: pool objects cannot be passed between processes or pickled

您也可以在您的函数中尝试立即重试几次f,以避免同步开销。这实际上是您的函数应该等待多长时间重试,以及如果立即重试成功的可能性有多大。


旧答案: 为了完整起见,这是我的旧答案,它不如直接重新提交到池中那样最佳,但根据用例可能仍然相关,因为它提供了一种自然的方式来处理/限制n级重试:

您可以使用 aQueue聚合失败并在每次运行结束时重新提交,在多次运行中:

import multiprocessing as mp
import random


def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    run_number = 1
    while pending:
        jobs = pending
        pending = []

        q = mp.Queue()
        p = mp.Pool(None, f_init, [q])
        results = p.imap(f, jobs)
        p.close()

        p.join()
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        successful = [r for r in results if not r is None]
        print "(%d) Succeeded: %s" % (run_number, successful)
        print "(%d) Failed:    %s" % (run_number, failed_items)
        print
        pending = failed_items
        run_number += 1

if __name__ == '__main__':
    main(range(1, 10))

输出如下:

(1) Succeeded: [9, 16, 36, 81]
(1) Failed:    [2, 1, 5, 7, 8]

(2) Succeeded: [64]
(2) Failed:    [2, 1, 5, 7]

(3) Succeeded: [1, 25]
(3) Failed:    [2, 7]

(4) Succeeded: [49]
(4) Failed:    [2]

(5) Succeeded: [4]
(5) Failed:    []
于 2012-07-24T03:26:49.443 回答