3

我有一个可执行文件,我需要经常使用不同的参数运行它。为此,我使用 multiprocessing 模块编写了一个小型 Python (2.7) 包装器,遵循此处给出的模式。

我的代码如下所示:

try:
     logging.info("starting pool runs")
     pool.map(run_nlin, params)
     pool.close()
 except KeyboardInterrupt:
     logging.info("^C pressed")
     pool.terminate()
 except Exception, e:
     logging.info("exception caught: ", e)
     pool.terminate()
 finally:
     time.sleep(5)
     pool.join()
     logging.info("done")

我的工作函数在这里:

class KeyboardInterruptError(Exception): pass

def run_nlin((path_config, path_log, path_nlin, update_method)):
    try:
        with open(path_log, "w") as log_:
            cmdline = [path_nlin, path_config]
            if update_method:
                cmdline += [update_method, ]
            sp.call(cmdline, stdout=log_, stderr=log_)
    except KeyboardInterrupt:
        time.sleep(5)
        raise KeyboardInterruptError()
    except:
        raise

path_config是二进制程序配置文件的路径;其中有例如运行程序的日期。

当我启动包装器时,一切看起来都很好。但是,当我按下 时^C,包装脚本似乎numproc在终止之前从池中启动了一个额外的进程。例如,当我在第 1-10 天启动脚本时,我可以在ps aux输出中看到二进制程序的两个实例正在运行(通常是第 1 天和第 3 天)。现在,当我按下 时^C,包装脚本退出,第 1 天和第 3 天的二进制程序消失了,但是第 5 天和第 7 天运行了新的二进制程序。

所以对我来说,似乎在最终死亡之前Pool启动了另一个进程。numproc

任何想法这里发生了什么,我能做些什么?

4

1 回答 1

12

这个页面上,多处理模块的作者 Jesse Noller 表明正确的处理方法KeyboardInterrupt是让子进程返回——而不是重新引发异常。这允许主进程终止池。

但是,如下面的代码所示,直到所有由 生成的任务都已运行except KeyboardInterrupt,主进程才会到达该块。这就是为什么(我相信)在按下之后,您会看到对工作函数的额外调用。pool.maprun_nlinCtrl-C

一种可能的解决方法是让所有工作函数测试是否multiprocessing.Event已设置 a。如果事件已设置,则让工作人员提前退出,否则,继续进行长时间计算。


import logging
import multiprocessing as mp
import time

logger = mp.log_to_stderr(logging.WARNING)

def worker(x):
    try:
        if not terminating.is_set():
            logger.warn("Running worker({x!r})".format(x = x))
            time.sleep(3)
        else:
            logger.warn("got the message... we're terminating!")
    except KeyboardInterrupt:
        logger.warn("terminating is set")        
        terminating.set()
    return x

def initializer(terminating_):
    # This places terminating in the global namespace of the worker subprocesses.
    # This allows the worker function to access `terminating` even though it is
    # not passed as an argument to the function.
    global terminating
    terminating = terminating_

def main():
    terminating = mp.Event()    
    result = []
    pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
    params = range(12)
    try:
         logger.warn("starting pool runs")
         result = pool.map(worker, params)
         pool.close()
    except KeyboardInterrupt:
        logger.warn("^C pressed")
        pool.terminate()
    finally:
        pool.join()
        logger.warn('done: {r}'.format(r = result))

if __name__ == '__main__':
    main()

运行脚本产生:

% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)

这里 Ctrl-C 被按下;每个工人设置terminating事件。我们真的只需要一个来设置它,但是尽管效率很低,但这仍然有效。

  C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set

现在所有其他排队的任务pool.map都运行了:

[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!

最后主进程到达except KeyboardInterrupt块。

[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []
于 2013-01-29T13:11:33.780 回答