我实现了一个 gRPC 服务器,它将数据流式传输到其客户端。我希望服务器通过“通道通知器”开始发送回另一个模块事件的数据。
为了说明要点,我将实现简化为“hello”服务器。gRPC 原型文件是:
syntax = "proto3";
package hello;
service Greeter {
rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
rs
文件很简单:
pub mod hello_world {
tonic::include_proto!("hello");
}
// This is my server's implementation.
pub struct MyGreeter {
// The new data is sent over to gRPC server via this mpsc::Receiver channel.
notifer: mpsc::Receiver<i32>,
}
#[tonic::async_trait]
impl Greeter for MyGreeter {
type SayHelloStream = ReceiverStream<Result<HelloReply, Status>>;
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<Self::SayHelloStream>, Status> {
println!("Got a request from {:?}", request.remote_addr());
// compiling error here: this data with lifetime `'life0`...
let (api_tx, api_rx) = mpsc::channel(3);
tokio::spawn(async move {
// compiling error here: ...and is required to live as long as `'static` here
self.notifer.recv().await.unwrap();
api_tx.send(Ok(HelloReply {
message: format!("Hello {}!", request.into_inner().name),
})).await.unwrap();
});
Ok(Response::new(ReceiverStream::new(api_rx)))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
// Create a pair of channel endpoint
// tx is used somewhere for sending new data.
let (tx, rx) = mpsc::channel(3);
let greeter = MyGreeter {
// the receiver endpoint
notifer: rx,
};
println!("GreeterServer listening on {}", addr);
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;
Ok(())
}
我在 main 中创建了一对通道端点,然后将接收器端点用作 gRPC 服务器中的通知器。发送端点被分配给另一个模块,该模块在此代码中省略,用于发送新生成的数据。
我知道这个错误一定与ownership
Rust 的有关。我试图将通道端点包装Arc<T>
在Mutex<T>
. 但是,它不起作用。