1

我想做的是在子类之间共享一个字典Process,当一个进程更新字典时,另一个进程会被通知使用它。这在下面的代码中进行了说明,其中MyProducer开始填充字典并在每次迭代中触发一个事件来通知MyConsumer处理字典。除了字典MyConsumer为空的部分之外,一切都有效......

from multiprocessing import Process, Manager, Event

class MyProducer(Process):
    increment = 0
    def __init__(self, dictionary, event):
        Process.__init__(self)
        self.dictionary = dictionary
        self.event = event
    
    def run(self):
        while self.increment < 20:
            self.dictionary[self.increment]=self.increment+10
            self.increment = self.increment + 1
            print("From producer: ", self.dictionary)
            self.event.set()
            while self.event.is_set() is True:
                increment = self.increment
                increment = increment + 1
        
class MyConsumer(Process):
    def __init__(self, dictionary, event):
        Process.__init__(self)
        self.dictionary = dictionary
        self.event = event
        
    
    def run(self):
        while True:
            self.event.wait()
            print("From consumer: ", self.dictionary)
            self.event.clear()
            

            
if __name__ == "__main__":

    with Manager() as manager:
        state_dict = manager.dict()
        state_ready = Event()
        producerprocess = MyProducer(state_dict, state_ready)
        consumerprocess = MyConsumer(state_dict, state_ready)
        producerprocess.start()
        consumerprocess.start()    

输出是

Process MyProducer-2:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 827, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "main.py", line 13, in run
    self.dictionary[self.increment]=self.increment+10
  File "<string>", line 2, in __setitem__
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 831, in _callmethod
    self._connect()
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 818, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 502, in Client
    c = SocketClient(address)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 630, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

更新

我的目的是了解为什么字典不适用于 Process 子类。我知道您可以在互联网上找到的所有有效案例。实际上我有一个很好的解决方案,只需将 dict 替换为 queue,我想了解为什么 dict 不起作用。

from multiprocessing import Process, Queue, Event

class MyProducer(Process):
    increment = 0
    def __init__(self, queue, event):
        Process.__init__(self)
        self.queue = queue
        self.event = event
    
    def run(self):
        while self.increment < 20:
            self.queue.put([self.increment,self.increment+10])
            self.increment = self.increment + 1
            print("From producer: ", self.queue.qsize())
            self.event.set()
            while self.event.is_set() is True:
                increment = self.increment
                increment = increment + 1
        
class MyConsumer(Process):
    def __init__(self, queue, event):
        Process.__init__(self)
        self.queue = queue
        self.event = event
        
    def run(self):
        while True:
            self.event.wait()
            print("From consumer: ", self.queue.qsize())
            self.event.clear()
            

if __name__ == "__main__":
  state_queue = Queue()
  state_ready = Event()
  producerprocess = MyProducer(state_queue, state_ready)
  consumerprocess = MyConsumer(state_queue, state_ready)
  producerprocess.start()
  consumerprocess.start()  

4

1 回答 1

2

仅供参考,我在这个更简单的程序中看到了同样的死亡:

from multiprocessing import Process, Manager, Event

class MyProducer(Process):
    def __init__(self, value, event):
        Process.__init__(self)
        self.val = value
        self.event = event

    def run(self):
        print("at producer start", self.val.value)
        self.val.value = 42
        self.event.set()

class MyConsumer(Process):
    def __init__(self, value, event):
        Process.__init__(self)
        self.val = value
        self.event = event

    def run(self):
        self.event.wait()
        print("From consumer: ", self.val.value)
                        
if __name__ == "__main__":
    with Manager() as manager:
        state_value = manager.Value('i', 666)
        state_ready = Event()
        producerprocess = MyProducer(state_value, state_ready)
        consumerprocess = MyConsumer(state_value, state_ready)
        producerprocess.start()
        consumerprocess.start()

这意味着,当从 a 获得的任何类型的对象Manager作为属性附加到对象 mp 必须在工作进程中“通过魔术”构造时,它都不会被有用地重建。连接到Manager服务器进程所需的信息似乎丢失了(无论是 Linux-y 系统上的套接字还是 Windows 上的命名管道)。

您可以提交错误报告,但在此之前没有什么可做的,除了重写代码以不使用 Manager 或将 Manager 对象显式传递给函数。

错误报告可能有两种解决方法:(1)让它“工作”;或者,(2) 更改代码以在尝试创建此类对象时引发异常。

另一种可能性(未尝试):如果您只在 Linux 上运行,您可以跳过__name__ == "__main__"测试并希望Manager连接信息仍然存在fork()

编辑

我在 Python 项目的跟踪器上打开了一个问题,在这里:

https://bugs.python.org/issue41660

解决方法

在 Python 问题报告中摆弄一些东西,这里的“问题”似乎不是事情如何设置的问题,而是在你的代码中忽略了彻底关闭工作人员的需要。只需在代码末尾添加这一行(dict版本 - 您关心的版本):

    producerprocess.join()

就足够了,现在在我的盒子上(Win 10 Python 3.8.5),它会产生你期望的输出。但是,它会永远挂起,因为您的消费者.wait()永远Event不会再次设置。

我的猜测(我 80% 肯定是正确的):没有.join(),主进程继续开始运行解释器关闭代码(没有什么可以做的了!),这开始强行破坏multiprocessing实现仍然需要的东西正常运行。

使用.join(),主进程阻塞,直到生产者完成 - 在此期间不启动任何关闭代码,并.join()明确指示生产者进程干净地关闭其(精心制作的!)multiprocessing舞蹈部分。

它可能会使消费者进程处于损坏状态,但如果是这样,我们将永远看不到它的证据,因为消费者在其self.event.wait().

在实际程序中,您也应该尽一切努力彻底关闭消费者进程。

完整代码

这是一个完整的程序,展示了惯用的 Python 和并行编程的最佳实践:一切都干净利落地关闭,没有“忙循环”,没有竞争,没有死锁。的实现State比这个特定问题所需的更复杂,但说明了一种非常值得学习的强大方法。

import multiprocessing as mp

P, C, F = 1, 2, 4 # bit flags for state values

# Unusual synchronization appears to be wanted here:
# After a producer makes a mutation, it must not make another
# before the consumer acts on it.  So we'll say we're in state
# P when the producer is allowed to mutate, and in state C
# when there's a mutation for the consumer to process.  Another
# state - F (for "finished") - tells the consumer it's time to
# quit. The producer stops on its own when it gets tired of
# mutating ;-)
class State:
    def __init__(self):
        # Initial state is empty - everyone is blocked.
        # Note that we do our own locking around the shared
        # memory, via the condition variable's mutex, so
        # it would be pure waste for the Value to have
        # its own lock too.
        self.state = mp.Value('B', 0, lock=False)
        self.changed = mp.Condition()

    # Wait for state to change to one of the states in the
    # flag mask `what`.  Return the bit flag of the state
    # that succeeded.
    def waitfor(self, what):
        with self.changed:
            while not (self.state.value & what):
                self.changed.wait()
            return self.state.value

    # Force state to (bit flag) `what`, and notify waiters
    # to wake up and see whether it's the state they're
    # waiting for.
    def setwhat(self, what):
        with self.changed:
            self.state.value = what
            self.changed.notify_all()

class Base(mp.Process):
    def __init__(self, dictionary, state):
        super().__init__()
        self.dictionary = dictionary
        self.state = state

class MyProducer(Base):
    def __init__(self, *args):
        super().__init__(*args)
        self.increment = 0

    def run(self):
        while self.increment < 20:
            self.state.waitfor(P)
            self.dictionary[self.increment] = self.increment + 10
            self.state.setwhat(C)
            # Whether the producer or the consumer prints the dict
            # first isn't forced - and, indeed, they can both print at
            # the same time, producing garbled output.  Move the
            # print() above the setwhat(C) to force the producer
            # to print first, if desired.
            print("From producer: ", self.dictionary)
            self.increment += 1

class MyConsumer(Base):
    def run(self):
        while self.state.waitfor(C | F) != F:
            print("From consumer: ", self.dictionary)
            self.state.setwhat(P)

def main():
    with mp.Manager() as manager:
        state_dict = manager.dict()
        state_state = State()
        producerprocess = MyProducer(state_dict, state_state)
        consumerprocess = MyConsumer(state_dict, state_state)
        producerprocess.start()
        consumerprocess.start()

        # The producer is blocked waiting for state P, and the
        # consumer is blocked waiting for state C (or F). The
        # loop here counts down 5 seconds, so you can verify
        # by eyeball that the waits aren't "busy" (they consume
        # essentially no CPU cycles).
        import time
        for i in reversed(range(5)):
            time.sleep(1)
            print(i)

        state_state.setwhat(P) # tell the producer to start!
        producerprocess.join() # and wait for it to finish
        # wait for the consumer to finish eating the last mutation
        state_state.waitfor(P)
        # tell the consumer we're all done
        state_state.setwhat(F)
        consumerprocess.join()    

if __name__ == "__main__":
    main()
于 2020-08-28T21:53:08.800 回答