20

我有一组任务要并行执行,但在它们结束时,我需要知道是否有任何线程引发了异常。我不需要直接处理异常,我只需要知道其中一个线程是否因异常而失败,这样我就可以干净地终止脚本

这是一个简单的例子:

#!/usr/bin/python

from time import sleep
from threading import Thread

def func(a):
    for i in range(0,5):
        print a
        sleep(1)

def func_ex():
    sleep(2)
    raise Exception("Blah")


x = [Thread(target=func, args=("T1",)), Thread(target=func, args=("T2",)), Thread(target=func_ex, args=())]

print "Starting"
for t in x:
    t.start()

print "Joining"
for t in x:
    t.join()


print "End"

在“结束”之前,我想遍历线程,看看是否有失败,然后决定是否可以继续执行脚本,或者此时是否需要退出。

我不需要拦截异常或停止其他线程,我只需要知道最后是否有任何失败。

4

3 回答 3

12

join()线程调用返回时,线程的堆栈已经展开,所有有关异常的信息都已丢失。因此,不幸的是,您需要提供自己的异常注册机制;这里讨论了一些技术。

于 2013-08-21T06:33:36.563 回答
3

对于不需要处理异常的情况,一种简单的技术是使用全局列表并在其上附加相关信息。你的代码会变成这样:

#!/usr/bin/python

from time import sleep
from threading import Thread, current_thread #needed to get thread name or whatever identifying info you need

threadErrors = [] #global list

def func(a):
    for i in range(0,5):
        print a
        sleep(1)

def func_ex():
    global threadErrors #if you intend to change a global variable from within a different scope it has to be declared
    try:
        sleep(2)
        raise Exception("Blah")
    except Exception, e:
        threadErrors.append([repr(e), current_thread.name]) #append a list of info
        raise #re-raise the exception or use sys.exit(1) to let the thread die and free resources 

x = [Thread(target=func, args=("T1",)), Thread(target=func, args=("T2",)), Thread(target=func_ex, args=())]

print "Starting"
for t in x:
    t.start()

print "Joining"
for t in x:
    t.join()

if len(threadErrors) > 0: #check if there are any errors 
    for e in threadErrors:
        print(threadErrors[e][0]+' occurred in thread: '+threadErrors[e][1])
        #do whatever with each error info
else: 
    #there are no errors so do normal clean-up stuff

#do clean-up that should happen in either case here

print "End"

注意:全局变量通常被认为是不好的技术,它们是线程之间通信的简单机制。您只需要记住,如果一个线程通过此路由发送信息,则另一个线程必须正在寻找它。

于 2016-02-24T22:03:00.613 回答
1

如果您要在测试中执行此操作,我建议您使用pytest-reraise 使用它,您可以执行以下操作:

def test_assert(reraise):

def run():
    with reraise:
        assert False

reraise() # This will not raise anything yet

t = Thread(target=run)
t.start()
t.join()

reraise() # This will raise the assertion error
于 2020-10-02T09:04:36.347 回答