150

如何使用 python 的多处理池处理 KeyboardInterrupt 事件?这是一个简单的例子:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

运行上面的代码时,KeyboardInterrupt当我按下 时会引发^C,但该过程只是在此时挂起,我必须在外部将其杀死。

我希望能够随时按下^C并导致所有进程正常退出。

4

11 回答 11

139

这是一个 Python 错误。在 threading.Condition.wait() 中等待条件时,永远不会发送 KeyboardInterrupt。复制:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

KeyboardInterrupt 异常在 wait() 返回之前不会被传递,并且它永远不会返回,因此中断永远不会发生。KeyboardInterrupt 应该几乎肯定会中断条件等待。

请注意,如果指定了超时,则不会发生这种情况;cond.wait(1) 将立即收到中断。因此,一种解决方法是指定超时。为此,请更换

    results = pool.map(slowly_square, range(40))

    results = pool.map_async(slowly_square, range(40)).get(9999999)

或类似的。

于 2009-09-11T00:45:17.267 回答
62

根据我最近的发现,最好的解决方案是将工作进程设置为完全忽略 SIGINT,并将所有清理代码限制在父进程中。这解决了空闲和忙碌工作进程的问题,并且不需要在您的子进程中使用错误处理代码。

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

解释和完整的示例代码可以分别在http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/http://github.com/jreese/multiprocessing-keyboardinterrupt找到。

于 2011-05-31T18:39:47.170 回答
31

Exception由于某些原因,只有从基类继承的异常才能正常处理。作为一种解决方法,您可以重新提出您KeyboardInterruptException实例:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

通常你会得到以下输出:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

因此,如果您点击^C,您将获得:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
于 2010-04-01T16:06:35.830 回答
15

投票的答案没有解决核心问题,而是类似的副作用。

multiprocessing.Pool多处理库的作者 Jesse Noller在一篇旧文中解释了如何正确处理 CTRL+C 的使用。

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
于 2017-07-02T09:43:59.807 回答
9

通常这种简单的结构适用于Ctrl- Con Pool :

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

正如一些类似的帖子所述:

在 Python 中捕获键盘中断而不使用 try-except

于 2012-10-31T13:44:37.497 回答
5

似乎有两个问题会在多处理烦人时产生异常。第一个(Glenn 指出)是您需要使用map_async超时而不是map为了获得立即响应(即,不要完成对整个列表的处理)。第二个(由 Andrey 指出)是多处理不会捕获不继承自Exception(例如,SystemExit)的异常。所以这是我处理这两个问题的解决方案:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
于 2014-05-15T15:23:24.483 回答
4

我发现,目前最好的解决方案是不使用 multiprocessing.pool 功能,而是使用自己的池功能。我提供了一个演示 apply_async 错误的示例,以及一个演示如何完全避免使用池功能的示例。

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

于 2010-08-26T17:16:12.710 回答
4

您可以尝试使用 Pool 对象的 apply_async 方法,如下所示:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

输出:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

此方法的一个优点是中断前处理的结果将在结果字典中返回:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
于 2018-08-14T05:30:30.360 回答
3

我是 Python 的新手。我到处寻找答案,偶然发现了这个以及其他一些博客和 youtube 视频。我试图复制粘贴上面作者的代码并在 Windows 7 64 位的 python 2.7.13 上重现它。它接近我想要实现的目标。

我让我的子进程忽略 ControlC 并使父进程终止。看起来绕过子进程确实为我避免了这个问题。

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

开始的部分pool.terminate()似乎永远不会执行。

于 2017-05-15T09:02:14.290 回答
2

如果您正在执行诸如 之类的方法,那么这些答案中的许多都是旧的和/或它们似乎不适用于Windows 上更高版本的 Python(我正在运行 3.8.5),该方法Pool.map会阻塞,直到所有提交的任务都完成为止。以下是我的解决方案。

  1. signal.signal(signal.SIGINT, signal.SIG_IGN)在主进程中发出调用以完全忽略 Ctrl-C。
  2. 处理池将使用一个池初始化程序初始化,该初始化程序将初始化每个处理器:全局变量ctrl_c_entered将设置为并发出False调用以最初忽略 Ctrl-C。此调用的返回值将被保存;这是重新建立时允许处理异常的原始默认处理程序。signal.signal(signal.SIGINT, signal.SIG_IGN)KyboardInterrupt
  3. 装饰器handle_ctrl_c可用于装饰应在输入 Ctrl-C 时立即退出的多处理函数和方法。这个装饰器将测试是否ctrl_c_entered设置了全局标志,如果设置了,甚至不需要运行函数/方法,而是返回一个KeyboardInterrupt异常实例。否则,将为 a 建立一个 try/catch 处理程序,KeyboardInterrupt并调用装饰的函数/方法。如果输入 Ctrl-C,则 globalctrl_c_entered将被设置为True并返回一个KeyboardInterrupt异常实例。无论如何,在返回之前装饰器将重新建立 SIG_IGN 处理程序。

本质上,所有提交的任务都将被允许启动,但KeyBoardInterrupt一旦输入了 Ctrl-C,将立即终止并返回异常值。主进程可以测试返回值是否存在这样的返回值来检测是否输入了Ctrl-C。

from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps

def handle_ctrl_c(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        global ctrl_c_entered
        if not ctrl_c_entered:
            signal.signal(signal.SIGINT, default_sigint_handler) # the default
            try:
                return func(*args, **kwargs)
            except KeyboardInterrupt:
                ctrl_c_entered = True
                return KeyboardInterrupt()
            finally:
                signal.signal(signal.SIGINT, pool_ctrl_c_handler)
        else:
            return KeyboardInterrupt()
    return wrapper

@handle_ctrl_c
def slowly_square(i):
    sleep(1)
    return i*i

def pool_ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

def init_pool():
    # set global variable for each process in the pool:
    global ctrl_c_entered
    global default_sigint_handler
    ctrl_c_entered = False
    default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)

def main():
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = Pool(initializer=init_pool)
    results = pool.map(slowly_square, range(40))
    if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
        print('Ctrl-C was entered.')
    else:
        print(results)

if __name__ == '__main__':
    main()
于 2021-08-07T19:14:55.140 回答
-5

奇怪的是,看起来你也必须处理KeyboardInterrupt孩子们的问题。我本来希望这能按书面形式工作...尝试更改slowly_square为:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

这应该可以按您的预期工作。

于 2009-09-11T00:26:22.800 回答