40

我正在尝试从 multiprocessing.Process 中获取回溯对象。不幸的是,通过管道传递异常信息不起作用,因为无法腌制回溯对象:

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        pipe_to_parent.send(sys.exc_info())

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print traceback.format_exception(*exc_info)
to_child.close()
to_self.close()

追溯:

Traceback (most recent call last):
  File "/usr/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap
    self.run()
  File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
    self._target(*self._args, **self._kwargs)
  File "foo", line 7, in foo
    to_parent.send(sys.exc_info())
PicklingError: Can't pickle <type 'traceback'>: attribute lookup __builtin__.traceback failed

还有其他方法可以访问异常信息吗?我想避免传递格式化的字符串。

4

6 回答 6

35

使用tblib您可以传递包装的异常并在以后重新引发它们:

import tblib.pickling_support
tblib.pickling_support.install()

from multiprocessing import Pool
import sys


class ExceptionWrapper(object):

    def __init__(self, ee):
        self.ee = ee
        __, __, self.tb = sys.exc_info()

    def re_raise(self):
        raise self.ee.with_traceback(self.tb)
        # for Python 2 replace the previous line by:
        # raise self.ee, None, self.tb


# example of how to use ExceptionWrapper

def inverse(i):
    """ will fail for i == 0 """
    try:
        return 1.0 / i
    except Exception as e:
        return ExceptionWrapper(e)


def main():
    p = Pool(1)
    results = p.map(inverse, [0, 1, 2, 3])
    for result in results:
        if isinstance(result, ExceptionWrapper):
            result.re_raise()


if __name__ == "__main__":
    main()

因此,如果您在远程进程中捕获异常,请将其包装起来ExceptionWrapper,然后将其传回。调用re_raise()主进程将完成这项工作。

于 2014-09-29T09:13:55.380 回答
30

由于multiprocessing确实会打印子进程中引发的异常的字符串内容,因此您可以将所有子进程代码包装在 try-except 中,以捕获任何异常,格式化相关的堆栈跟踪,并引发一个Exception在其字符串中保存所有相关信息的 new :

我使用的函数示例multiprocessing.map

def run_functor(functor):
    """
    Given a no-argument functor, run it and return its result. We can 
    use this with multiprocessing.map and map it over a list of job 
    functors to do them.

    Handles getting more than multiprocessing's pitiful exception output
    """

    try:
        # This is where you do your actual work
        return functor()
    except:
        # Put all exception text into an exception and raise that
        raise Exception("".join(traceback.format_exception(*sys.exc_info())))

您得到的是一个堆栈跟踪,其中包含另一个格式化的堆栈跟踪作为错误消息,这有助于调试。

于 2013-05-17T22:33:34.413 回答
14

似乎很难使回溯对象变得可腌制。但是您只能使用traceback.extract_tbsys.exc_info()方法发送 2 的第一个项目和一个预先格式化的回溯信息:

import multiprocessing
import sys
import traceback

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        except_type, except_class, tb = sys.exc_info()
        pipe_to_parent.send((except_type, except_class, traceback.extract_tb(tb)))

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print exc_info
to_child.close()
to_self.close()

这给了你:

(<type 'exceptions.Exception'>, Exception('xxx',), [('test_tb.py', 7, 'foo', "raise Exception('xxx')")])

然后,您将能够获取有关异常原因的更多信息(文件名、引发异常的行号、方法名称和引发异常的语句)

于 2011-05-25T15:30:39.327 回答
10

蟒蛇 3

在 Python 3 中,现在返回完整回溯的get方法,请参见http://bugs.python.org/issue13831multiprocessing.pool.Async

蟒蛇2

使用traceback.format_exc(表示格式化的 expetion)来获取回溯字符串。制作如下装饰器会更方便。

def full_traceback(func):
    import traceback, functools
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            msg = "{}\n\nOriginal {}".format(e, traceback.format_exc())
            raise type(e)(msg)
    return wrapper

例子:

def func0():
    raise NameError("func0 exception")

def func1():
    return func0()

# Key is here!
@full_traceback
def main(i):
    return func1()

if __name__ == '__main__':
    from multiprocessing import Pool
    pool = Pool(4)
    try:
        results = pool.map_async(main, range(5)).get(1e5)
    finally:
        pool.close()
        pool.join()

装饰器回溯:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0

Original Traceback (most recent call last):
  File "bt.py", line 13, in wrapper
    return func(*args, **kwargs)
  File "bt.py", line 27, in main
    return func1()
  File "bt.py", line 23, in func1
    return func0()
  File "bt.py", line 20, in func0
    raise NameError("Exception in func0")
NameError: Exception in func0

没有装饰器的回溯:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0
于 2017-04-05T06:29:56.787 回答
4

这是这个优秀答案的变体。两者都依赖tblib来存储回溯。

但是,不必返回异常对象(如 OP 所要求的那样),该worker函数可以保持原样,只需包装在try/except中以存储异常以重新引发。

import tblib.pickling_support
tblib.pickling_support.install()

import sys

class DelayedException(Exception):

    def __init__(self, ee):
        self.ee = ee
        __,  __, self.tb = sys.exc_info()
        super(DelayedException, self).__init__(str(ee))

    def re_raise(self):
        raise self.ee, None, self.tb

例子

def worker():
    try:
        raise ValueError('Something went wrong.')
    except Exception as e:
        raise DelayedException(e)


if __name__ == '__main__':

    import multiprocessing

    pool = multiprocessing.Pool()
    try:
        pool.imap(worker, [1, 2, 3])
    except DelayedException as e:
        e.re_raise()
于 2017-02-02T10:53:32.677 回答
0

与@Syrtis Major@interfect相同的解决方案,但使用 Python 3.6 进行了测试:

import sys
import traceback
import functools

def catch_remote_exceptions(wrapped_function):
    """ https://stackoverflow.com/questions/6126007/python-getting-a-traceback """

    @functools.wraps(wrapped_function)
    def new_function(*args, **kwargs):
        try:
            return wrapped_function(*args, **kwargs)

        except:
            raise Exception( "".join(traceback.format_exception(*sys.exc_info())) )

    return new_function

用法:

class ProcessLocker(object):
    @catch_remote_exceptions
    def __init__(self):
        super().__init__()

    @catch_remote_exceptions
    def create_process_locks(self, total_processes):
        self.process_locks = []
        # ...
于 2019-09-06T18:49:31.843 回答