37

太长; 没读

warnings.catch_warnings()上下文管理器不是线程安全的。如何在并行处理环境中使用它?

背景

下面的代码使用 Pythonmultiprocessing模块的并行处理解决了最大化问题。它需要一个(不可变的)小部件列表,对它们进行分区(请参阅Python 3 中大规模、蛮力最大化的高效多处理),找到所有分区的最大值(“决赛选手”),然后找到最大值(“冠军” ) 那些“决赛选手”。如果我正确理解了自己的代码(如果我理解了我就不会在这里),我将与所有子进程共享内存以向它们提供输入小部件,并multiprocessing使用操作系统级管道和酸洗来发送工作人员完成后,入围小部件返回主流程。

问题的根源

我想捕捉在小部件从进程间管道出来时发生的 unpickling 之后由小部件重新实例化引起的冗余小部件警告。当小部件对象实例化时,它们验证自己的数据,从 Python 标准warnings模块发出警告,告诉应用程序的用户小部件怀疑用户的输入数据存在问题。因为 unpickling 会导致对象实例化,所以我对代码的理解意味着每个小部件对象仅被重新实例化一次,当且仅当它从管道中出来后进入决赛 - 请参阅下一节以了解为什么这是不正确的.

这些小部件在被 fronicated 之前已经创建,因此用户已经痛苦地意识到他输入错误并且不想再听到它。这些是我想用warnings模块的catch_warnings()上下文管理器(即with语句)捕获的警告。

失败的解决方案

在我的测试中,当多余的警告被发送到我在下面标记为Line ALine B之间的任何地方时,我已经缩小了范围。令我惊讶的是,警告正在发出的地方不仅仅是 near output_queue.get()。这对我来说意味着multiprocessing使用酸洗将小部件发送给工人。

结果是,将一个上下文管理器放置在warnings.catch_warnings()A行到B行的所有内容周围,并在此上下文中设置正确的警告过滤器不会捕获警告。这对我来说意味着警告正在工作进程中发出。将此上下文管理器放在工作代码周围也不会捕获警告。

编码

此示例省略了用于确定问题规模是否太小而无法处理派生进程、导入多处理以及定义my_frobnal_counter和的代码my_load_balancer

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)
4

3 回答 3

2

您可以尝试覆盖Process.run要使用的方法warnings.catch_warnings

>>> from multiprocessing import Process
>>> 
>>> def yell(text):
...    import warnings
...    print 'about to yell %s' % text
...    warnings.warn(text)
... 
>>> class CustomProcess(Process):
...    def run(self, *args, **kwargs):
...       import warnings
...       with warnings.catch_warnings():
...          warnings.simplefilter("ignore")
...          return Process.run(self, *args, **kwargs)
... 
>>> if __name__ == '__main__':
...    quiet = CustomProcess(target=yell, args=('...not!',))
...    quiet.start()
...    quiet.join()
...    noisy = Process(target=yell, args=('AAAAAAaaa!',))
...    noisy.start()
...    noisy.join()
... 
about to yell ...not!
about to yell AAAAAAaaa!
__main__:4: UserWarning: AAAAAAaaa!
>>> 

或者你可以使用一些内部结构......(__warningregistry__

>>> from multiprocessing import Process
>>> import exceptions
>>> def yell(text):
...    import warnings
...    print 'about to yell %s' % text
...    warnings.warn(text)
...    # not filtered
...    warnings.warn('complimentary second warning.')
... 
>>> WARNING_TEXT = 'AAAAaaaaa!'
>>> WARNING_TYPE = exceptions.UserWarning
>>> WARNING_LINE = 4
>>> 
>>> class SelectiveProcess(Process):
...    def run(self, *args, **kwargs):
...       registry = globals().setdefault('__warningregistry__', {})
...       registry[(WARNING_TEXT, WARNING_TYPE, WARNING_LINE)] = True
...       return Process.run(self, *args, **kwargs)
... 
>>> if __name__ == '__main__':
...    p = SelectiveProcess(target=yell, args=(WARNING_TEXT,))
...    p.start()
...    p.join()
... 
about to yell AAAAaaaaa!
__main__:6: UserWarning: complimentary second warning.
>>> 
于 2012-10-12T04:53:01.677 回答
2

unpickling 不会导致__init__执行两次。我在 Windows 上运行了以下代码,但没有发生(每个__init__都只运行一次)。

因此,您需要向我们提供来自my_load_balancer和来自小部件类的代码。在这一点上,您的问题根本没有提供足够的信息。

作为随机猜测,您可能会检查是否my_load_balancer复制了小部件,从而导致它们再次被实例化。

import multiprocessing
import collections

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def my_load_balancer(widgets):
    partitions = tuple(set() for _ in range(8))
    for i, widget in enumerate(widgets):
        partitions[i % 8].add(widget)
    for partition in partitions:
        yield partition

def my_frobnal_counter(widget):
    return widget.id

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)

class Widget:
    id = 0
    def __init__(self):
        print('initializing Widget {}'.format(self.id))
        self.id = Widget.id
        Widget.id += 1

    def __str__(self):
        return str(self.id)

    def __repr__(self):
        return str(self)

def main():

    widgets = [Widget() for _ in range(16)]
    result = frobnicate_parallel(widgets)
    print(result.id)


if __name__ == '__main__':
    main()
于 2012-10-12T06:08:34.417 回答
1

多年后,我终于有了一个解决方案(在处理一个不相关的问题时发现)。我已经在 Python 3.7、3.8 和 3.9 上对此进行了测试。

暂时用空列表打补丁sys.warnoptions[]。您只需要在调用process.start(). sys.warnoptions被记录为您不应手动修改的实现细节;官方建议是使用模块中的函数并warnings设置PYTHONWARNINGSos.environ. 这行不通。唯一似乎有效的方法是打补丁sys.warnoptions。在测试中,您可以执行以下操作:

import multiprocessing
from unittest.mock import patch
p = multiprocessing.Process(target=my_function)
with patch('sys.warnoptions', []):
    p.start()
p.join()

如果您不想使用unittest.mock,只需手动修补:

import multiprocessing
import sys
p = multiprocessing.Process(target=my_function)
old_warnoptions = sys.warnoptions
try:
    sys.warnoptions = []
    p.start()
finally:
    sys.warnoptions = old_warnoptions
p.join()
于 2020-10-21T23:15:26.047 回答