23

我一直在寻找一个简单的python线程池模式的良好实现,但真的找不到任何适合我需要的东西。我正在使用 python 2.7 并且我发现的所有模块都不起作用,或者没有正确处理工作人员中的异常。我想知道是否有人知道可以提供我正在搜索的功能类型的库。非常感谢帮助。

多处理

我的第一次尝试是使用内置multiprocessing模块,但由于它不使用线程而是使用子进程,我们遇到了无法腌制对象的问题。不去这里。

from multiprocessing import Pool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = Pool(processes=8)
for s in samples: pool.apply_async(s.compute_fib, [20])
pool.join()
for s in samples: print s.fib

# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

期货

所以我看到这里有 python 3.2 的一些很酷的并发特性的后端口。这似乎完美且易于使用。问题是,当您在其中一名工作人员中遇到异常时,您只能获得异常类型,例如“ZeroDivisionError”,但没有回溯,因此没有指示哪一行导致了异常。代码变得无法调试。不去。

from concurrent import futures

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = futures.ThreadPoolExecutor(max_workers=8)
threads = [pool.submit(s.compute_fib, 20) for s in samples]
futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
for t in threads: t.result()
for s in samples: print s.fib


#    futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self)
#    354     def __get_result(self):
#    355         if self._exception:
#--> 356             raise self._exception
#    357         else:
#    358             return self._result
#
# ZeroDivisionError: integer division or modulo by zero

工人池

我在这里找到了这种模式的另一个实现。这次发生异常时,它会被打印出来,但是我的 ipython 交互式解释器处于挂起状态,需要从另一个 shell 中终止。不去。

import workerpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = workerpool.WorkerPool(size=8)
for s in samples: pool.map(s.compute_fib, [20])
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ^C^C^C^C^C^C^C^C^D^D
# $ kill 1783

线程池

这里还有另一个实现。这次发生异常时,它会打印到 ,stderr但脚本不会中断,而是继续执行,这违背了异常的目的,并且会使事情变得不安全。还是不能用。

import threadpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = threadpool.ThreadPool(8)
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples]
requests = [y for x in requests for y in x]
for r in requests: pool.putRequest(r)
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
#---> 17 for s in samples: print s.fib
#
#AttributeError: 'Sample' object has no attribute 'fib'

- 更新 -

看来,关于futures库,python 3 的行为与 python 2 不同。

futures_exceptions.py

from concurrent.futures import ThreadPoolExecutor, as_completed

def div_zero(x):
    return x / 0

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = executor.map(div_zero, range(4))
    for future in as_completed(futures): print(future)

Python 2.7.6输出:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 12, in <module>
    for future in as_completed(futures):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed
    with _AcquireFutures(fs):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__
    self.futures = sorted(futures, key=id)
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map
    yield future.result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
    return self.__get_result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
    raise self._exception
ZeroDivisionError: integer division or modulo by zero

Python 3.3.2输出:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 11, in <module>
    for future in as_completed(futures):
  File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed
    with _AcquireFutures(fs):
  File "...python3.3/concurrent/futures/_base.py", line 142, in __init__
    self.futures = sorted(futures, key=id)
  File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator
    yield future.result()
  File "...python3.3/concurrent/futures/_base.py", line 392, in result
    return self.__get_result()
  File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result
    raise self._exception
  File "...python3.3/concurrent/futures/thread.py", line 54, in run
    result = self.fn(*self.args, **self.kwargs)
  File "...futures_exceptions.py", line 7, in div_zero
    return x / 0
ZeroDivisionError: division by zero
4

5 回答 5

5

我个人使用concurrent.futures的界面非常简单。对于回溯问题,我找到了一种解决方法来保留它。查看我对另一个问题的回答:

在 concurrent.futures 中获取异常的原始行号

于 2014-06-28T11:09:07.033 回答
3

如果您想获取有关线程中未处理异常的信息并使用 ThreadPoolExecutor,您可以这样做:

import time
import traceback

from concurrent.futures import ThreadPoolExecutor


def worker():
    a = 2 / 0


def worker_callbacks(f):
    e = f.exception()

    if e is None:
        return

    trace = []
    tb = e.__traceback__
    while tb is not None:
        trace.append({
            "filename": tb.tb_frame.f_code.co_filename,
            "name": tb.tb_frame.f_code.co_name,
            "lineno": tb.tb_lineno
        })
        tb = tb.tb_next
    print(str({
        'type': type(e).__name__,
        'message': str(e),
        'trace': trace
    }))


executor = ThreadPoolExecutor(max_workers=1)
executor.submit(worker).add_done_callback(worker_callbacks)
于 2021-04-07T20:47:01.167 回答
1

简单的解决方案:使用最适合您的任何替代方案,并try-except在您的工人中实施您自己的块。如果必须的话,围绕根呼叫。

我不会说这些库“错误地”处理异常。它们有一个默认行为,无论多么原始。如果默认设置不适合您,您应该自己处理。

于 2013-03-12T11:00:52.753 回答
0

对于像我这样遇到这个问题并正在使用的人threading.ThreadPool:如果您希望能够处理异常,apply_async并且map_async两者都有一个error_callback关键字参数,您可以将一个函数传递给它来处理发生的异常。

以防万一它帮助任何人!

于 2021-10-08T18:54:58.430 回答
0

受这里 se7entyse7en的回答的启发:

class NonSilentThreadPoolExecutor(ThreadPoolExecutor):
    def submit(self, fn, *args, **kwargs):
        # Submits the wrapped function instead of `fn`
        return super().submit(self._function_wrapper, fn, *args, **kwargs)

    def _function_wrapper(self, fn, *args, **kwargs):
        try:
            return fn(*args, **kwargs)
        except BaseException as e:
            logger.exception(e) # or your way of dealing with any exceptions... 
            # is the following really necessary? 
            # raise sys.exc_info()[0](traceback.format_exc())
            raise e

...我个人不太明白行号有什么问题(se7entyse7en 试图在他的回答中解决):logger.exception(e)似乎打印出引发异常的精确行号,以及堆栈的其余部分痕迹貌似还行。

在他的回答中,se7entyse7en 正确地谈到了使用Futurefrom concurrent.futures。但是很多时候你会发现自己实际上调用线程(例如 PyQt5 中的 Gui 线程)不能等待result来自Future. 因此,如果您想等待该结果,则必须submit对该(或另一个)执行者执行另一项任务并等待Future那里的结果……但这会完全破坏对象,因为在等待时再次引发任何异常因为结果也会被默默吞下。

如果您等不及Future.

但至少您通常希望记录任何引发的异常:当在此并发上下文之外发生未处理的异常时,您总是按照标准将堆栈跟踪打印到控制台。因此,在某种程度上,我确实认为ThreadPoolExecutor该类至少应该允许您将“异常时打印堆栈跟踪”指定为 的可选参数submit,或者最好仍然包含可选的“on_exception”回调参数,如下所示:

class ExceptionAwareThreadPoolExecutor(ThreadPoolExecutor):
    def submit(self, fn, *args, on_exception='console', **kwargs):
        self.on_exception = on_exception
        return super().submit(self._function_wrapper, fn, *args, **kwargs)
        
    def _function_wrapper(self, fn, *args, **kwargs):
        try:
            return fn(*args, **kwargs)
        except BaseException as e:
            if self.on_exception == 'console':
                # print stack to console:
                logging.error(f'Exception class {e.__class__.__name__} raised', exc_info=True)
            elif self.on_exception != None:
                self.on_exception(e)
            raise e

然后像这样调用以达到与上面相同的结果:

executor.submit(task_to_be_performed, on_exception=logger.exception)

或者这样,异常的堆栈跟踪将被打印到控制台(NB 也使它成为一个插件ThreadPoolExecutor,无需更改代码):

executor.submit(task_to_be_performed)

或抑制堆栈跟踪输出:

executor.submit(task_to_be_performed, on_exception=None)

logging.error(即使您没有设置记录器,NB也可以工作)

于 2021-12-24T09:38:45.553 回答