2

我正在尝试复制一个套接字并将其发送到 Python 中的另一个进程。

套接字是在 rust 中创建的,并通过 PyO3 作为 Python 对象共享。

这是共享套接字代码


use pyo3::prelude::*;

use socket2::{Domain, Protocol, Socket, Type};
use std::net::SocketAddr;

#[pyclass]
#[derive(Debug)]
pub struct SocketHeld {
    pub socket: Socket,
}

#[pymethods]
impl SocketHeld {
    #[new]
    pub fn new(address: String, port: i32) -> PyResult<SocketHeld> {
        let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?;
        println!("{}", address);
        let address: SocketAddr = address.parse()?;
        socket.set_reuse_address(true)?;
        //socket.set_reuse_port(true)?;
        socket.bind(&address.into())?;
        socket.listen(1024)?;

        Ok(SocketHeld { socket })
    }

    pub fn try_clone(&self) -> PyResult<SocketHeld> {
        let copied = self.socket.try_clone()?;
        Ok(SocketHeld { socket: copied })
    }
}

impl SocketHeld {
    pub fn get_socket(&self) -> Socket {
        self.socket.try_clone().unwrap()
    }
}


下面是 python 代码,我试图在其中启动两个不同的进程。我尝试使用本机多进程库、多进程库的分支甚至是 pathos 库。



    def start(self, url="127.0.0.1", port=5000):
        """
        [Starts the server]

        :param port [int]: [reperesents the port number at which the server is listening]
        """
        socket = SocketHeld(f"0.0.0.0:{port}", port)
        if not self.dev:
            from pathos.pools import ProcessPool
            pool = ProcessPool(nodes=2)
            # spawned_process(url, port, self.routes, socket.try_clone(), f"Process {1}")
            pool.map(spawned_process, [(url, port, self.routes, socket.try_clone(), f"Process {1}"), (url, port, self.routes, socket.try_clone(), f"Process {2}")])
            # for i in range(2):
            #     copied = socket.try_clone()
            #     p = Pool().map(
            #         spawned_process,
            #         args=(self.routes, copied, f"Process {i}"),
            #     )
            #     p.start()

            # input("Press Cntrl + C to stop \n")
            # self.server.start(url, port)
        else:
            ...

但是,我仍然收到无法序列化对象的错误。

我收到以下错误:


Traceback (most recent call last):
  File "integration_tests/base_routes.py", line 75, in <module>
    app.start(port=5000, url='0.0.0.0')
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/robyn/__init__.py", line 95, in start
    pool.map(spawned_process, [(url, port, self.routes, socket.try_clone(), f"Process {1}"), (url, port, self.routes, socket.try_clone(), f"Process {2}")])
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/pathos/multiprocessing.py", line 139, in map
    return _pool.map(star(f), zip(*args)) # chunksize
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/pool.py", line 771, in get
    raise self._value
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/pool.py", line 537, in _handle_tasks
    put(task)
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/connection.py", line 209, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/reduction.py", line 54, in dumps
    cls(buf, protocol, *args, **kwds).dump(obj)
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/dill/_dill.py", line 498, in dump
    StockPickler.dump(self, obj)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 485, in dump
    self.save(obj)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 899, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 899, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 576, in save
    rv = reduce(self.proto)
TypeError: cannot pickle 'builtins.SocketHeld' object


在概念上这里的某个地方出错了吗?解决方案是什么?

PS:

我正在尝试在此过程中启动服务器运行时。


def spawned_process(url, port, handlers, socket, name):
    import asyncio
    import uvloop

    uvloop.install()
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)

    print(handlers)
    server = Server()


    for i in handlers:
        route_type, endpoint, handler, is_async, number_of_params = i
        print(i)
        server.add_route(route_type, endpoint, handler, is_async, number_of_params)

    print(socket, name)
    server.start(url, port, socket, name)
    asyncio.get_event_loop().run_forever()

4

1 回答 1

4

套接字基本上只是对某些操作系统内核结构的进程相关引用。由于酸洗仅涉及此参考的用户空间部分而不涉及内核结构,因此不能简单地在其他进程、不同机器等处对套接字进行酸洗和恢复。

在 UNIX 系统中,文件描述符可以通过 UNIX 域套接字在进程之间传递,这将在另一个进程中创建对相同内核结构的另一个引用。但这不适用于 SSL 套接字,因为有一些与此套接字相关的用户空间状态,它既不是文件描述符进程的一部分,也不是 Python 特定的酸洗的一部分。

于 2021-10-31T16:09:14.263 回答