太长; 没读
warnings.catch_warnings()
上下文管理器不是线程安全的。如何在并行处理环境中使用它?
背景
下面的代码使用 Pythonmultiprocessing
模块的并行处理解决了最大化问题。它需要一个(不可变的)小部件列表,对它们进行分区(请参阅Python 3 中大规模、蛮力最大化的高效多处理),找到所有分区的最大值(“决赛选手”),然后找到最大值(“冠军” ) 那些“决赛选手”。如果我正确理解了自己的代码(如果我理解了我就不会在这里),我将与所有子进程共享内存以向它们提供输入小部件,并multiprocessing
使用操作系统级管道和酸洗来发送工作人员完成后,入围小部件返回主流程。
问题的根源
我想捕捉在小部件从进程间管道出来时发生的 unpickling 之后由小部件重新实例化引起的冗余小部件警告。当小部件对象实例化时,它们验证自己的数据,从 Python 标准warnings
模块发出警告,告诉应用程序的用户小部件怀疑用户的输入数据存在问题。因为 unpickling 会导致对象实例化,所以我对代码的理解意味着每个小部件对象仅被重新实例化一次,当且仅当它从管道中出来后进入决赛 - 请参阅下一节以了解为什么这是不正确的.
这些小部件在被 fronicated 之前已经创建,因此用户已经痛苦地意识到他输入错误并且不想再听到它。这些是我想用warnings
模块的catch_warnings()
上下文管理器(即with
语句)捕获的警告。
失败的解决方案
在我的测试中,当多余的警告被发送到我在下面标记为Line A和Line B之间的任何地方时,我已经缩小了范围。令我惊讶的是,警告正在发出的地方不仅仅是 near output_queue.get()
。这对我来说意味着multiprocessing
使用酸洗将小部件发送给工人。
结果是,将一个上下文管理器放置在warnings.catch_warnings()
从A行到B行的所有内容周围,并在此上下文中设置正确的警告过滤器不会捕获警告。这对我来说意味着警告正在工作进程中发出。将此上下文管理器放在工作代码周围也不会捕获警告。
编码
此示例省略了用于确定问题规模是否太小而无法处理派生进程、导入多处理以及定义my_frobnal_counter
和的代码my_load_balancer
。
"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"
def frobnicate_parallel_worker(widgets, output_queue):
resultant_widget = max(widgets, key=my_frobnal_counter)
output_queue.put(resultant_widget)
def frobnicate_parallel(widgets):
output_queue = multiprocessing.Queue()
# partitions: Generator yielding tuples of sets
partitions = my_load_balancer(widgets)
processes = []
# Line A: Possible start of where the warnings are coming from.
for partition in partitions:
p = multiprocessing.Process(
target=frobnicate_parallel_worker,
args=(partition, output_queue))
processes.append(p)
p.start()
finalists = []
for p in processes:
finalists.append(output_queue.get())
# Avoid deadlocks in Unix by draining queue before joining processes
for p in processes:
p.join()
# Line B: Warnings no longer possible after here.
return max(finalists, key=my_frobnal_counter)