6

我正在使用 Python 2.7。

我目前正在使用这样的 ThreadPoolExecuter:

params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
    result = list(executor.map(f, params))

问题是f有时运行时间过长。每当我运行f时,我想将其运行时间限制为 100 秒,然后将其杀死。

最终,对于 中的每个元素xparam我想知道是否f必须被杀死,如果不是 - 返回值是多少。即使f一个参数超时,我仍然想用下一个参数运行它。

executer.map方法确实有一个timeout参数,但它为整个运行设置了超时,从调用到 的时间executer.map,而不是单独为每个线程设置超时。

获得所需行为的最简单方法是什么?

4

1 回答 1

8

这个答案是关于 python 的多处理库,它通常比线程库更可取,除非你的函数只是在等待网络调用。请注意,多处理和线程库具有相同的接口。

鉴于您的进程每个可能运行 100 秒,相比之下,为每个进程创建一个进程的开销相当小。您可能必须创建自己的流程才能获得必要的控制。

一种选择是将 f 包装在另一个最多执行 100 秒的函数中:

from multiprocessing import Pool

def timeout_f(arg):
    pool = Pool(processes=1)
    return pool.apply_async(f, [arg]).get(timeout=100)

然后您的代码更改为:

    result = list(executor.map(timeout_f, params))

或者,您可以编写自己的线程/进程控制:

from multiprocessing import Process
from time import time

def chunks(l, n):
    """ Yield successive n-sized chunks from l. """
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

processes = [Process(target=f, args=(i,)) for i in params]
exit_codes = []
for five_processes = chunks(processes, 5):
    for p in five_processes:
        p.start()
    time_waited = 0
    start = time()
    for p in five_processes:
        if time_waited >= 100:
            p.join(0)
            p.terminate()
        p.join(100 - time_waited)
        p.terminate()
        time_waited = time() - start
    for p in five_processes:
        exit_codes.append(p.exit_code)

您必须通过诸如Can I get a return value from multiprocessing.Process 之类的方法来获取返回值?

如果进程已完成,则进程的退出代码为 0,如果它们被终止,则为非零。

技术来自: 加入一组带有超时的python进程你如何将一个列表分成大小均匀的块?


作为另一种选择,您可以尝试在multiprocessing.Pool上使用 apply_async

from multiprocessing import Pool, TimeoutError
from time import sleep    

if __name__ == "__main__":
    pool = Pool(processes=5)
    processes = [pool.apply_async(f, [i]) for i in params]
    results = []
    for process in processes:
        try:
            result.append(process.get(timeout=100))
        except TimeoutError as e:
            results.append(e)

请注意,上述每个进程可能等待超过 100 秒,就好像第一个进程需要 50 秒才能完成,第二个进程在运行时将有 50 秒的额外时间。需要更复杂的逻辑(例如前面的示例)来强制执行更严格的超时。

于 2014-09-22T15:35:38.713 回答