您真正想要的是某种将异常传递给父进程的方法,对吗?然后你可以随心所欲地处理它们。
如果您使用concurrent.futures.ProcessPoolExecutor
,这是自动的。如果你使用multiprocessing.Pool
,它是微不足道的。如果你使用显式Process
and Queue
,你必须做一些工作,但不是那么多。
例如:
def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
self.outputQueue.put(result)
except Exception as e:
self.outputQueue.put(e)
然后,您的调用代码可以Exception
像其他任何内容一样从队列中读取 s。而不是这个:
yield outq.pop()
做这个:
result = outq.pop()
if isinstance(result, Exception):
raise result
yield result
(我不知道你实际的父进程队列读取代码做了什么,因为你的最小样本只是忽略了队列。但希望这能解释这个想法,即使你的真实代码实际上并不是这样工作的。)
这假定您要中止任何未处理的异常,使其达到run
. 如果您想传回异常并继续到下一个i in iter
,只需将 移到try
,for
而不是围绕它。
这也假设Exception
s 不是有效值。如果这是一个问题,最简单的解决方案就是推送(result, exception)
元组:
def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
self.outputQueue.put((result, None))
except Exception as e:
self.outputQueue.put((None, e))
然后,您的弹出代码执行以下操作:
result, exception = outq.pop()
if exception:
raise exception
yield result
您可能会注意到这类似于 node.js 回调样式,您传递(err, result)
给每个回调。是的,这很烦人,而且你会弄乱这种风格的代码。但是除了包装器之外,您实际上并没有在任何地方使用它;所有从队列中获取值或在内部调用的“应用程序级”代码run
只会看到正常的返回/收益和引发的异常。
您甚至可能想要考虑构建一个Future
符合规范的concurrent.futures
(或按原样使用该类),即使您正在手动进行排队和执行工作。这并不难,它为您提供了一个非常好的 API,尤其是用于调试。
最后,值得注意的是,大多数围绕工作者和队列构建的代码可以通过执行器/池设计变得更简单,即使您绝对确定每个队列只需要一个工作者。只需废弃所有样板,并将Worker.run
方法中的循环转换为函数(正常情况下只是return
s 或raise
s,而不是附加到队列中)。在调用方,再次废弃所有样板文件和只是submit
或map
带有参数的作业函数。
您的整个示例可以简化为:
def job(i):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
return result
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
results = executor.map(job, range(10))
它会自动正确处理异常。
正如您在评论中提到的,异常的回溯不会回溯到子进程;它只涉及手动raise result
调用(或者,如果您使用的是池或执行器,则池或执行器的胆量)。
原因是它multiprocessing.Queue
建立在 之上pickle
,并且腌制异常不会腌制它们的回溯。原因是你不能腌制回溯。原因是回溯中充满了对本地执行上下文的引用,因此让它们在另一个进程中工作将非常困难。
那么……你能做些什么呢?不要去寻找一个完全通用的解决方案。相反,想想你真正需要什么。90% 的情况下,您想要的是“记录异常,使用回溯并继续”或“打印异常,使用回溯,到stderr
默认exit(1)
的未处理异常处理程序”。对于其中任何一个,您根本不需要传递异常;只需在子端格式化它并传递一个字符串。如果您确实需要更花哨的东西,请准确计算出您需要什么,并传递足够的信息来手动将它们组合在一起。如果您不知道如何格式化回溯和异常,请参阅traceback
模块。这很简单。这意味着您根本不需要进入泡菜机械。(不是那个'copyreg
一个pickler或用一个__reduce__
方法或任何东西编写一个持有者类,但如果你不需要,为什么要学习所有这些?)