1

我正在使用 theasynciomultiprocessinglibrary 来运行两个进程,每个进程都有一个服务器实例在不同的端口上侦听传入消息。

为了识别每个客户端,我想dict在两个进程之间共享一个来更新已知客户端的列表。为了实现这一点,我决定使用一个查找键,它为此连接Tuple[StreamReader, StreamWriter]分配了一个对象。Client

但是,只要我插入或简单地访问共享字典,程序就会崩溃并显示以下错误消息:

Task exception was never retrieved
future: <Task finished name='Task-5' coro=<GossipServer.handle_client() done, defined at /home/croemheld/Documents/network/server.py:119> exception=AttributeError("Can't pickle local object 'WeakSet.__init__.<locals>._remove'")>
Traceback (most recent call last):
  File "/home/croemheld/Documents/network/server.py", line 128, in handle_client
    if not await self.handle_message(reader, writer, buffer):
  File "/home/croemheld/Documents/network/server.py", line 160, in handle_message
    client = self.syncmanager.get_api_client((reader, writer))
  File "<string>", line 2, in get_api_client
  File "/usr/lib/python3.9/multiprocessing/managers.py", line 808, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.9/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'

我自然查了一下错误信息,发现了这个问题,但我真的不明白这是什么原因。据我了解,此崩溃的原因是StreamReader并且StreamWriter不能被腌制/序列化以便在进程之间共享。如果这实际上是原因,有没有办法腌制它们,也许通过修补减速器功能来代替使用不同的腌制器?

4

2 回答 2

0

您可能对使用SyncManager感兴趣。只要确保在最后调用关闭管理器,shutdown就不会留下僵尸进程。

from multiprocessing.managers import SyncManager
from multiprocessing import Process
import signal

my_manager = SyncManager()

# to avoid closing the manager by ctrl+C. be sure to handle KeyboardInterrupt errors and close the manager accordingly
def manager_init():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

my_manager.start(manager_init)

my_dict = my_manager.dict()
my_dict["clients"] = my_manager.list()
def my_process(my_id, the_dict):
    for i in range(3):
        the_dict["clients"].append(f"{my_id}_{i}")

processes = []
for j in range(4):
    processes.append(Process(target=my_process, args=(j,my_dict)))

for p in processes:
    p.start()

for p in processes:
    p.join()

print(my_dict["clients"])
# ['0_0', '2_0', '0_1', '3_0', '1_0', '0_2', '1_1', '2_1', '3_1', '1_2', '2_2', '3_2']

my_manager.shutdown()



于 2021-08-19T16:57:34.393 回答
0

我设法找到了一种解决方法,同时还保留了asynciomultiprocessing库而没有任何其他库。

首先,由于StreamReaderandStreamWriter对象不可拾取,我不得不使用socket. 这很容易通过一个简单的功能实现:

def get_socket(writer: StreamWriter):
    fileno = writer.get_extra_info('socket').fileno()
    return socket.fromfd(fileno, AddressFamily.AF_INET, socket.SOCK_STREAM)

套接字被插入共享对象(例如Manager().dict(),甚至是自定义类,您必须通过自定义BaseManager实例注册)。现在,由于应用程序是asyncio在库提供的流上构建并使用的,我们可以轻松地将其转换socket为一对StreamReaderStreamWritervia:

node_reader, node_writer = await asyncio.open_connection(sock=self.node_sock)
node_writer.write(mesg_text)
await node_writer.drain()

通过共享对象传递self.node_sock的实例在哪里。socket

于 2021-08-20T23:33:30.347 回答