0

我实现了一个 gRPC 服务器,它将数据流式传输到其客户端。我希望服务器通过“通道通知器”开始发送回另一个模块事件的数据。

为了说明要点,我将实现简化为“hello”服务器。gRPC 原型文件是:

syntax = "proto3";

package hello;

service Greeter {
    rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}

rs文件很简单:

pub mod hello_world {
    tonic::include_proto!("hello");
}

// This is my server's implementation.
pub struct MyGreeter {
    // The new data is sent over to gRPC server via this mpsc::Receiver channel.
    notifer: mpsc::Receiver<i32>,
}


#[tonic::async_trait]
impl Greeter for MyGreeter {

    type SayHelloStream = ReceiverStream<Result<HelloReply, Status>>;

    async fn say_hello(
        &self,
        request: Request<HelloRequest>,
    ) -> Result<Response<Self::SayHelloStream>, Status> {
        println!("Got a request from {:?}", request.remote_addr());

        // compiling error here: this data with lifetime `'life0`...
        let (api_tx, api_rx) = mpsc::channel(3);

        tokio::spawn(async move {

            // compiling error here: ...and is required to live as long as `'static` here
            self.notifer.recv().await.unwrap();

            api_tx.send(Ok(HelloReply {
                message: format!("Hello {}!", request.into_inner().name),
            })).await.unwrap();
        });

        Ok(Response::new(ReceiverStream::new(api_rx)))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse().unwrap();

    // Create a pair of channel endpoint
    // tx is used somewhere for sending new data.
    let (tx, rx) = mpsc::channel(3);

    let greeter = MyGreeter {
        // the receiver endpoint
        notifer: rx,
    };

    println!("GreeterServer listening on {}", addr);

    Server::builder()
        .add_service(GreeterServer::new(greeter))
        .serve(addr)
        .await?;

    Ok(())
}

我在 main 中创建了一对通道端点,然后将接收器端点用作 gRPC 服务器中的通知器。发送端点被分配给另一个模块,该模块在此代码中省略,用于发送新生成的数据。

我知道这个错误一定与ownershipRust 的有关。我试图将通道端点包装Arc<T>Mutex<T>. 但是,它不起作用。

4

0 回答 0