假设我们有一个主线程,它为测试模块启动两个线程——“test_a”和“test_b”。两个测试模块线程都保持它们的状态,无论它们是否已完成执行测试,或者它们是否遇到任何错误、警告或它们是否想要更新一些其他信息。
主线程如何访问这些信息并采取相应的行动。例如,如果“test_a”引发错误标志;在出现错误之前,“main”如何知道并停止其余的测试?
一种方法是使用全局变量,但这会变得非常难看..很快。
假设我们有一个主线程,它为测试模块启动两个线程——“test_a”和“test_b”。两个测试模块线程都保持它们的状态,无论它们是否已完成执行测试,或者它们是否遇到任何错误、警告或它们是否想要更新一些其他信息。
主线程如何访问这些信息并采取相应的行动。例如,如果“test_a”引发错误标志;在出现错误之前,“main”如何知道并停止其余的测试?
一种方法是使用全局变量,但这会变得非常难看..很快。
显而易见的解决方案是共享某种可变变量,方法是在构造函数/启动时将其传递给线程对象/函数。
做到这一点的干净方法是构建一个具有适当实例属性的类。如果您使用的是threading.Thread
子类,而不仅仅是线程函数,您通常可以使用子类本身作为粘贴这些属性的地方。但我会用 a 来展示它,list
因为它更短:
def test_a_func(thread_state):
# ...
thread_state[0] = my_error_state
# ...
def main_thread():
test_states = [None]
test_a = threading.Thread(target=test_a_func, args=(test_states,))
test_a.start()
您也可以(并且通常希望)将Lock
or打包Condition
到可变状态对象中,这样您就可以在main_thread
and之间正确同步test_a
。
(另一种选择是使用 a queue.Queue
、 anos.pipe
等来传递信息,但您仍然需要将该队列或管道传递给子线程 - 您可以按照与上述完全相同的方式执行此操作。)
但是,值得考虑您是否真的需要这样做。如果您将test_a
andtest_b
视为“作业”,而不是“线程函数”,您可以在池上执行这些作业,并让池处理传递的结果或错误。
例如:
try:
with concurrent.futures.ThreadPoolExecutor(workers=2) as executor:
tests = [executor.submit(job) for job in (test_a, test_b)]
for test in concurrent.futures.as_completed(tests):
result = test.result()
except Exception as e:
# do stuff
现在,如果test_a
函数引发异常,主线程将获得该异常——并且,因为这意味着退出with
块,所有其他作业都被取消并丢弃,并且工作线程关闭。
如果你使用的是 2.5-3.1,你没有concurrent.futures
内置,但你可以安装PyPI 的 backport,或者你可以重写周围的东西multiprocessing.dummy.Pool
。(这种方式稍微复杂一些,因为您必须创建一系列作业并调用map_async
以取回AsyncResult
对象的迭代器……但实际上这仍然非常简单。)