1

我是 Rust 网络编程和线程的新手,所以我可能在这里遗漏了一些明显的东西。我一直在尝试构建一个简单的聊天应用程序。只是,他用标准库来做,而我正在尝试用 tokio 来做。功能非常简单:客户端向服务器发送消息,服务器确认并将其发送回客户端。这是我的客户端和服务器代码,尽可能地精简:

服务器.rs

#[tokio::main]
async fn main() {
    let server = TcpListener::bind("127.0.0.1:7878").await.unwrap();      
    let mut clients = vec![];
    let (tx, mut rx) = mpsc::channel(32);
    
    loop {
        if let Ok((socket, addr)) = server.accept().await {
            let tx = tx.clone();
            let (mut reader, writer) = split(socket);
            clients.push(writer);

            tokio::spawn(async move {
                loop {
                    let mut buffer = vec![0; 1024];

                    reader.read(&mut buffer).await.unwrap();
                    //get message written by the client and print it
                    //then transmit it on the channel
                    let msg = buffer.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
                    let msg = String::from_utf8(msg).expect("Invalid utf8 message");
                    println!("{}: {:?}", addr, msg);

                    match tx.send(msg).await {
                        Ok(_) => { ()}
                        Err(_) => { println!("Error");}
                    }
                }

            });
        }

        //write each message received back to its client
        if let Some(msg) = rx.recv().await { 
            clients = clients.into_iter().filter_map(|mut x| {
                println!("writing: {:?}", &msg);
                x.write(&msg.clone().into_bytes());
                Some(x)
            }).collect::<Vec<_>>();
        }
        
    }
}

客户端.rs

#[tokio::main]
async fn main() {
    let client = TcpStream::connect("127.0.0.1:7878").await.unwrap();
    let (tx, mut rx) = mpsc::channel::<String>(32);

    tokio::spawn(async move {
        loop {
            let mut buffer = vec![0; 1024];

            // get message sent by the server and print it
            match client.try_read(&mut buffer) {
                Ok(_) => { 
                    let msg = buffer.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
                    println!("Received from server: {:?}", msg); 
                }
                Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
                    ()
                }
                Err(_) => {
                    println!("Connection with server was severed");
                    break;
                }
            }
            
            // get message transmitted from user input loop
            // then write it to the server
            match rx.try_recv() {
                Ok(message) => {
                    let mut buffer = message.clone().into_bytes();
                    buffer.resize(1024, 0);
                    match client.try_write(&buffer) {
                        Ok(_) => { println!("Write successful");}
                        Err(_) => { println!("Write error");}
                    }
                }
                Err(TryRecvError::Empty) => (), 
                _ => break

            }
        }
    } );
// user input loop here
// takes user message and transmits it on the channel
    
}

发送到服务器工作正常,服务器似乎正在成功写入,如其输出所示:

127.0.0.1:55346: "test message"
writing: "test message"

问题是客户端永远不会从服务器读回消息,而是WouldBlock每次遇到match client.try_read(&mut buffer)块时都会出错。

如果我在保持客户端运行的同时停止服务器,客户端会突然被成功读取的空消息淹没:

Received from server: []
Received from server: []
Received from server: []
Received from server: []
Received from server: []
Received from server: []
Received from server: []
Received from server: []
...

谁能告诉我发生了什么事?

4

1 回答 1

0

以下是您的服务器中发生的情况:

  • 等待客户端连接。
  • 当客户端连接时,生成一个后台任务以从客户端接收。
  • 尝试从通道读取,因为此时客户端不太可能已经发送任何内容,通道是空的。
  • 循环→等待另一个客户端连接。

在等待另一个客户端时,后台任务从第一个客户端接收消息并将其发送到通道,但主任务被阻止等待另一个客户端并且不再尝试从通道读取。

让它工作的最简单方法是摆脱服务器中的通道,并简单地从生成的任务中回显消息。

另一种解决方案是生成一个独立的任务来处理通道并写入客户端。


至于杀死服务器时会发生什么:一旦连接丢失,尝试从套接字读取不会返回错误,而是返回一个空缓冲区。

于 2021-10-03T21:29:52.537 回答