13

我正在使用 Python 2.7.3。我已经使用子类multiprocessing.Process对象并行化了一些代码。如果我的子类 Process 对象中的代码没有错误,那么一切运行正常。但是,如果我的子类 Process 对象中的代码有错误,它们显然会默默地崩溃(没有堆栈跟踪打印到父 shell)并且 CPU 使用率将下降到零。父代码永远不会崩溃,给人的印象是执行只是挂起。同时,很难找出代码中的错误在哪里,因为没有给出错误在哪里的指示。

我在 stackoverflow 上找不到任何其他处理相同问题的问题。

我猜想子类 Process 对象似乎会默默地崩溃,因为它们无法将错误消息打印到父的 shell,但我想知道我能做些什么,以便我至少可以更有效地调试(以及其他我的代码的用户也可以在遇到问题时告诉我)。

编辑:我的实际代码太复杂了,但是一个带有错误的子类 Process 对象的简单示例将是这样的:

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)
4

3 回答 3

18

您真正想要的是某种将异常传递给父进程的方法,对吗?然后你可以随心所欲地处理它们。

如果您使用concurrent.futures.ProcessPoolExecutor,这是自动的。如果你使用multiprocessing.Pool,它是微不足道的。如果你使用显式Processand Queue,你必须做一些工作,但不是那么多。

例如:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put(result)
    except Exception as e:
        self.outputQueue.put(e)

然后,您的调用代码可以Exception像其他任何内容一样从队列中读取 s。而不是这个:

yield outq.pop()

做这个:

result = outq.pop()
if isinstance(result, Exception):
    raise result
yield result

(我不知道你实际的父进程队列读取代码做了什么,因为你的最小样本只是忽略了队列。但希望这能解释这个想法,即使你的真实代码实际上并不是这样工作的。)

这假定您要中止任何未处理的异常,使其达到run. 如果您想传回异常并继续到下一个i in iter,只需将 移到tryfor而不是围绕它。

这也假设Exceptions 不是有效值。如果这是一个问题,最简单的解决方案就是推送(result, exception)元组:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put((result, None))
    except Exception as e:
        self.outputQueue.put((None, e))

然后,您的弹出代码执行以下操作:

result, exception = outq.pop()
if exception:
    raise exception
yield result

您可能会注意到这类似于 node.js 回调样式,您传递(err, result)给每个回调。是的,这很烦人,而且你会弄乱这种风格的代码。但是除了包装器之外,您实际上并没有在任何地方使用它;所有从队列中获取值或在内部调用的“应用程序级”代码run只会看到正常的返回/收益和引发的异常。

您甚至可能想要考虑构建一个Future符合规范的concurrent.futures(或按原样使用该类),即使您正在手动进行排队和执行工作。这并不难,它为您提供了一个非常好的 API,尤其是用于调试。

最后,值得注意的是,大多数围绕工作者和队列构建的代码可以通过执行器/池设计变得更简单,即使您绝对确定每个队列只需要一个工作者。只需废弃所有样板,并将Worker.run方法中的循环转换为函数(正常情况下只是returns 或raises,而不是附加到队列中)。在调用方,再次废弃所有样板文件和只是submitmap带有参数的作业函数。

您的整个示例可以简化为:

def job(i):
    # (code that does stuff)
    1 / 0 # Dumb error
    # (more code that does stuff)
    return result

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(job, range(10))

它会自动正确处理异常。


正如您在评论中提到的,异常的回溯不会回溯到子进程;它只涉及手动raise result调用(或者,如果您使用的是池或执行器,则池或执行器的胆量)。

原因是它multiprocessing.Queue建立在 之上pickle,并且腌制异常不会腌制它们的回溯。原因是你不能腌制回溯。原因是回溯中充满了对本地执行上下文的引用,因此让它们在另一个进程中工作将非常困难。

那么……你能做些什么呢?不要去寻找一个完全通用的解决方案。相反,想想你真正需要什么。90% 的情况下,您想要的是“记录异常,使用回溯并继续”或“打印异常,使用回溯,到stderr默认exit(1)的未处理异常处理程序”。对于其中任何一个,您根本不需要传递异常;只需在子端格式化它并传递一个字符串。如果您确实需要更花哨的东西,请准确计算出您需要什么,并传递足够的信息来手动将它们组合在一起。如果您不知道如何格式化回溯和异常,请参阅traceback模块。这很简单。这意味着您根本不需要进入泡菜机械。(不是那个'copyreg一个pickler或用一个__reduce__方法或任何东西编写一个持有者类,但如果你不需要,为什么要学习所有这些?)

于 2013-03-21T00:08:46.293 回答
2

我建议这样的解决方法来显示进程的异常

from multiprocessing import Process
import traceback


run_old = Process.run

def run_new(*args, **kwargs):
    try:
        run_old(*args, **kwargs)
    except (KeyboardInterrupt, SystemExit):
        raise
    except:
        traceback.print_exc(file=sys.stdout)

Process.run = run_new
于 2015-09-09T16:51:52.817 回答
1

这不是答案,只是扩展评论。请运行这个程序并告诉我们你得到了什么输出(如果有的话):

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)

if __name__ == '__main__':
    inq, outq = Queue(), Queue()
    inq.put(1)
    inq.put('STOP')
    w = Worker(inq, outq)
    w.start()

我得到:

% test.py
Process Worker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/unutbu/pybin/test.py", line 21, in run
    1 / 0 # Dumb error
ZeroDivisionError: integer division or modulo by zero

我很惊讶(如果)你什么也没得到。

于 2013-03-20T23:57:30.180 回答