91

如何在多进程 python 程序中捕获Ctrl+C并优雅地退出所有进程,我需要该解决方案在 unix 和 windows 上都可以工作。我尝试了以下方法:

import multiprocessing
import time
import signal
import sys

jobs = []

def worker():
    signal.signal(signal.SIGINT, signal_handler)
    while(True):
        time.sleep(1.1234)
        print "Working..."

def signal_handler(signal, frame):
    print 'You pressed Ctrl+C!'
    # for p in jobs:
    #     p.terminate()
    sys.exit(0)

if __name__ == "__main__":
    for i in range(50):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

它有点工作,但我认为这不是正确的解决方案。

4

3 回答 3

89

先前接受的解决方案具有竞争条件,并且不适用于mapasync功能。


处理Ctrl+C/ SIGINTwith的正确方法multiprocessing.Pool是:

  1. SIGINT在创建进程之前忽略该进程Pool。这种方式创建的子进程继承SIGINT处理程序。
  2. SIGINT在创建a 之后恢复父进程中的原始处理程序Pool
  3. 使用map_asyncandapply_async而不是阻塞mapand apply
  4. 等待结果超时,因为默认阻塞等待忽略所有信号。这是 Python 错误https://bugs.python.org/issue8296

把它放在一起:

#!/bin/env python
from __future__ import print_function

import multiprocessing
import os
import signal
import time

def run_worker(delay):
    print("In a worker process", os.getpid())
    time.sleep(delay)

def main():
    print("Initializng 2 workers")
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = multiprocessing.Pool(2)
    signal.signal(signal.SIGINT, original_sigint_handler)
    try:
        print("Starting 2 jobs of 5 seconds each")
        res = pool.map_async(run_worker, [5, 5])
        print("Waiting for results")
        res.get(60) # Without the timeout this blocking call ignores all signals.
    except KeyboardInterrupt:
        print("Caught KeyboardInterrupt, terminating workers")
        pool.terminate()
    else:
        print("Normal termination")
        pool.close()
    pool.join()

if __name__ == "__main__":
    main()

正如@YakovShklarov 所指出的,在父进程中忽略信号和取消忽略信号之间有一个时间窗口,在此期间信号可能会丢失。改为临时阻止父进程中的信号传递将pthread_sigmask防止信号丢失,但是,它在 Python-2 中不可用。

于 2016-02-01T15:33:35.283 回答
37

该解决方案基于此链接此链接,它解决了问题,但我不得不搬到Pool

import multiprocessing
import time
import signal
import sys

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def worker():
    while(True):
        time.sleep(1.1234)
        print "Working..."

if __name__ == "__main__":
    pool = multiprocessing.Pool(50, init_worker)
    try:
        for i in range(50):
            pool.apply_async(worker)

        time.sleep(10)
        pool.close()
        pool.join()

    except KeyboardInterrupt:
        print "Caught KeyboardInterrupt, terminating workers"
        pool.terminate()
        pool.join()
于 2012-07-03T14:20:50.037 回答
15

只需在工作进程中处理 KeyboardInterrupt-SystemExit 异常:

def worker():
    while(True):
        try:
            msg = self.msg_queue.get()
        except (KeyboardInterrupt, SystemExit):
            print("Exiting...")
            break
于 2013-05-01T18:44:29.723 回答