0

我正在尝试使用 Rust 及其 Tokio 运行时创建一个客户端,并使用 and 将其绑定到pyo3python pyo3-asyncio。举一个最小的例子,假设我想要一个Client连接到127.0.0.1:1234并有一个echo方法来处理通过套接字传入的数据的类。我希望该函数返回一个 Python 协程,以便我可以将它与asyncPython 代码库中的函数一起使用。示例用例:

import asyncio
from client import Client  # pyo3-bound rust code

c = Client()
asyncio.run(c.echo())

我已经按照文档pyo3-asyncio并找到了这个函数。我写了这个(希望是最小的)片段来重现这个问题:

use pyo3::prelude::*;

use tokio::runtime::Runtime;
use tokio::net::TcpStream;
use tokio::io::AsyncReadExt;


#[pyclass]
struct Client {
    rt: Runtime,
    stream: TcpStream,
}

#[pymethods]
impl Client {
    #[new]
    fn new() -> PyResult<Self> {
        let rt = Runtime::new().expect("Error creating runtime");
        let stream = rt.block_on(async {
            TcpStream::connect("127.0.0.1:1234").await
        })?;
        Ok(Self { rt, stream })
    }

    fn echo(&mut self) -> PyResult<PyObject> {
        let gil = Python::acquire_gil();
        let py = gil.python();
        pyo3_asyncio::tokio::into_coroutine(py, async move {
            loop {
                let mut vec = Vec::new();
                self.stream.read(&mut vec);
                println!("{:?}", vec);
            }
        })
    }
}


#[pymodule]
fn client(py: Python, m: &PyModule) -> PyResult<()> {
    let runtime = Runtime::new().expect("Tokio runtime error");
    m.add_class::<Client>()?;
    pyo3_asyncio::tokio::init(runtime);

    Ok(())
}

问题似乎是self,也就是说Client,有一个匿名的生命周期,但预计会有'static一个。编译器说:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/lib.rs:28:60
   |
25 |       fn call_every_second(&mut self, callback: PyObject) -> PyResult<PyObject> {
   |                            --------- this data with an anonymous lifetime `'_`...
...
28 |           pyo3_asyncio::tokio::into_coroutine(py, async move {
   |  ____________________________________________________________^
29 | |             loop {
30 | |                 let mut vec = Vec::new();
31 | |                 self.stream.read(&mut vec);
32 | |                 println!("{:?}", vec);
33 | |             }
34 | |         })
   | |_________^ ...is captured here...
   |

有没有办法可以确保客户端的静态生命周期?也许我错误地解决了这个问题?

4

1 回答 1

1

into_coroutine()函数需要一个'static未来,所以你不能在未来借用Client'stream字段。一种解决方法是进行更改Client,使其存储streamArc<Mutex<TcpStream>>

struct Client {
    rt: Runtime,
    stream: Arc<Mutex<TcpStream>>, // using tokio::sync::Mutex
}

然后,您可以into_coroutine()通过以下方式授予对流的访问权限Arc

    // ...
    let stream = Arc::clone(&self.stream);
    pyo3_asyncio::tokio::into_coroutine(py, async move {
        loop {
            let mut vec = Vec::new();
            stream.lock().await.read(&mut vec).await?;
            println!("{:?}", vec);
        }
    })
}
于 2021-10-12T11:30:24.107 回答