5

提前抱歉,这会很长......

可能相关:

Python 多处理 atexit 错误“atexit._run_exitfuncs 中的错误”

绝对相关:

具有全局数据的python并行映射(multiprocessing.Pool.map)

python的多处理池的键盘中断

这是我一起破解的“简单”脚本来说明我的问题......

import time
import multiprocessing as multi
import atexit

cleanup_stuff=multi.Manager().list([])

##################################################
# Some code to allow keyboard interrupts  
##################################################
was_interrupted=multi.Manager().list([])
class _interrupt(object):
    """
    Toy class to allow retrieval of the interrupt that triggered it's execution
    """
    def __init__(self,interrupt):
        self.interrupt=interrupt

def interrupt():
    was_interrupted.append(1)

def interruptable(func):
    """
    decorator to allow functions to be "interruptable" by
    a keyboard interrupt when in python's multiprocessing.Pool.map
    **Note**, this won't actually cause the Map to be interrupted,
    It will merely cause the following functions to be not executed.
    """
    def newfunc(*args,**kwargs):
        try:
            if(not was_interrupted):
                return func(*args,**kwargs)
            else:
                return False
        except KeyboardInterrupt as e:
            interrupt()
            return _interrupt(e)  #If we really want to know about the interrupt...
    return newfunc

@atexit.register
def cleanup():
    for i in cleanup_stuff:
        print(i)
    return

@interruptable
def func(i):
    print(i)
    cleanup_stuff.append(i)
    time.sleep(float(i)/10.)
    return i

#Must wrap func here, otherwise it won't be found in __main__'s dict
#Maybe because it was created dynamically using the decorator?
def wrapper(*args):
    return func(*args)


if __name__ == "__main__":

    #This is an attempt to use signals -- I also attempted something similar where
    #The signals were only caught in the child processes...Or only on the main process...
    #
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)

    #Try 2 with signals (only catch signal on main process)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)
    #def startup(): signal.signal(signal.SIGINT,signal.SIG_IGN)
    #p=multi.Pool(processes=4,initializer=startup)

    #Try 3 with signals (only catch signal on child processes)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,signal.SIG_IGN)
    #def startup(): signal.signal(signal.SIGINT,onSigInt)
    #p=multi.Pool(processes=4,initializer=startup)


    p=multi.Pool(4)
    try:
        out=p.map(wrapper,range(30))
        #out=p.map_async(wrapper,range(30)).get()  #This doesn't work either...

        #The following lines don't work either
        #Effectively trying to roll my own p.map() with p.apply_async 
        # results=[p.apply_async(wrapper,args=(i,)) for i in range(30)]
        # out = [ r.get() for r in results() ]
    except KeyboardInterrupt:
        print ("Hello!")
        out=None
    finally:
        p.terminate()
        p.join()

    print (out)

如果没有引发 KeyboardInterrupt,这工作得很好。但是,如果我提出一个,则会发生以下异常:

10
7
9
12
^CHello!
None
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "test.py", line 58, in cleanup
    for i in cleanup_stuff:
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
   c = SocketClient(address)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
   s.connect(address)
  File "<string>", line 1, in connect
error: [Errno 2] No such file or directory
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "test.py", line 58, in cleanup
    for i in cleanup_stuff:
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
    c = SocketClient(address)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
    s.connect(address)
  File "<string>", line 1, in connect
socket.error: [Errno 2] No such file or directory

有趣的是,代码确实退出了 Pool.map 函数而不调用任何其他函数......问题似乎是 KeyboardInterrupt 在某些时候没有正确处理,但它在哪里有点令人困惑,并且为什么不以可中断方式处理它。谢谢。

注意,如果我使用同样的问题 out=p.map_async(wrapper,range(30)).get()

编辑 1

更接近一点......如果我将它包含out=p.map(...)在一个try,except,finally子句中,它会消除第一个异常......但是,其他异常仍然在 atexit 中提出。上面的代码和回溯已经更新。

编辑 2

其他不起作用的内容已作为注释添加到上面的代码中。(同样的错误)。这次尝试的灵感来自:

http://jessenoller.com/2009/01/08/multiprocessingpool-and-keyboardinterrupt/

编辑 3

使用添加到上述代码中的信号的另一次尝试失败。

编辑 4

我已经想出了如何重组我的代码,以便不再需要上述内容。如果(不太可能)有人偶然发现这个线程与我有相同的用例,我将描述我的解决方案......

用例

我有一个使用tempfile模块生成临时文件的函数。我希望在程序退出时清理这些临时文件。我最初的尝试是将每个临时文件名打包到一个列表中,然后使用通过atexit.register. 问题是更新的列表没有跨多个进程更新。这就是我想到使用multiprocessing.Manager它来管理列表数据的地方。不幸的是,KeyboardInterrupt无论我多么努力,这都失败了,因为进程之间的通信套接字由于某种原因被破坏了。这个问题的解决方案很简单。在使用多处理之前,设置临时文件目录......类似tempfile.tempdir=tempfile.mkdtemp()然后注册一个函数来删除临时目录。每个进程都写入同一个临时目录,因此它可以工作。 当然,此解决方案仅适用于共享数据是在程序生命周期结束时需要删除的文件列表的情况。

4

0 回答 0