我正在使用 theasyncio
和multiprocessing
library 来运行两个进程,每个进程都有一个服务器实例在不同的端口上侦听传入消息。
为了识别每个客户端,我想dict
在两个进程之间共享一个来更新已知客户端的列表。为了实现这一点,我决定使用一个查找键,它为此连接Tuple[StreamReader, StreamWriter]
分配了一个对象。Client
但是,只要我插入或简单地访问共享字典,程序就会崩溃并显示以下错误消息:
Task exception was never retrieved
future: <Task finished name='Task-5' coro=<GossipServer.handle_client() done, defined at /home/croemheld/Documents/network/server.py:119> exception=AttributeError("Can't pickle local object 'WeakSet.__init__.<locals>._remove'")>
Traceback (most recent call last):
File "/home/croemheld/Documents/network/server.py", line 128, in handle_client
if not await self.handle_message(reader, writer, buffer):
File "/home/croemheld/Documents/network/server.py", line 160, in handle_message
client = self.syncmanager.get_api_client((reader, writer))
File "<string>", line 2, in get_api_client
File "/usr/lib/python3.9/multiprocessing/managers.py", line 808, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib/python3.9/multiprocessing/connection.py", line 211, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'
我自然查了一下错误信息,发现了这个问题,但我真的不明白这是什么原因。据我了解,此崩溃的原因是StreamReader
并且StreamWriter
不能被腌制/序列化以便在进程之间共享。如果这实际上是原因,有没有办法腌制它们,也许通过修补减速器功能来代替使用不同的腌制器?