3

Tokio 在其上具有相同的简单 TCP 回显服务器示例:

但是,在这两页中,都没有解释实际发生的事情。这是示例,稍作修改,以便主函数不返回Result<(), Box<dyn std::error::Error>>

use tokio::net::TcpListener;
use tokio::prelude::*;

#[tokio::main]
async fn main() {
    if let Ok(mut tcp_listener) = TcpListener::bind("127.0.0.1:8080").await {
        while let Ok((mut tcp_stream, _socket_addr)) = tcp_listener.accept().await {
            tokio::spawn(async move {
                let mut buf = [0; 1024];
                // In a loop, read data from the socket and write the data back.
                loop {
                    let n = match tcp_stream.read(&mut buf).await {
                        // socket closed
                        Ok(n) if n == 0 => return,
                        Ok(n) => n,
                        Err(e) => {
                            eprintln!("failed to read from socket; err = {:?}", e);
                            return;
                        }
                    };
                    // Write the data back
                    if let Err(e) = tcp_stream.write_all(&buf[0..n]).await {
                        eprintln!("failed to write to socket; err = {:?}", e);
                        return;
                    }
                }
            });
        }
    }
}

阅读 Tokio 文档 ( https://tokio.rs/docs/overview/ ) 后,这是我对这个示例的心智模型。为每个新的 TCP 连接生成一个任务。每当发生读/写错误或客户端结束连接(即n == 0案例)时,任务就会结束​​。因此,如果某个时间点有 20 个连接的客户端,则将有 20 个衍生任务。然而,在底层,这并不等同于产生 20 个线程来同时处理连接的客户端。据我了解,这基本上是异步运行时试图解决的问题。到目前为止正确吗?

接下来,我的心智模型是 tokio 调度程序(例如threaded_scheduler,应用程序默认的多线程,或测试默认的单线程basic_scheduler)将在 1 对 N 线程上同时调度这些任务。(附带问题:对于threaded_scheduler,N 在应用程序的生命周期内是固定的吗?如果是,它是否等于num_cpus::get()?)。如果一个任务正在.await执行readorwrite_all操作,那么调度程序可以使用同一线程为其他 19 个任务之一执行更多工作。还是正确的?

最后,我很好奇外部代码(即.awaiting for的代码tcp_listener.accept())本身是否是一项任务?这样,在 20 个连接的客户端示例中,实际上并不是 20 个任务,而是 21 个:一个用于侦听新连接 + 每个连接一个。所有这 21 个任务都可以在一个或多个线程上同时调度,具体取决于调度程序。在下面的示例中,我将外部代码包装在 atokio::spawn.await句柄中。是否完全等同于上面的例子?

use tokio::net::TcpListener;
use tokio::prelude::*;

#[tokio::main]
async fn main() {
    let main_task_handle = tokio::spawn(async move {
        if let Ok(mut tcp_listener) = TcpListener::bind("127.0.0.1:8080").await {
            while let Ok((mut tcp_stream, _socket_addr)) = tcp_listener.accept().await {
                tokio::spawn(async move {
                    // ... same as above ...
                });
            }
        }
    });
    main_task_handle.await.unwrap();
}
4

1 回答 1

1

这个答案是我从 Alice Ryhl 那里收到的关于 Tokio's Discord 的答案的摘要。非常感谢!

首先,确实,对于多线程调度程序,操作系统线程的数量是固定的num_cpus

.await其次,Tokio 可以在每个线程的基础上交换当前正在运行的任务。

第三,主函数运行在它自己的任务中,它是由#[tokio::main]宏产生的。

因此,对于第一个代码块示例,如果有 20 个连接的客户端,则将有 21 个任务:一个用于主宏 + 一个用于 20 个打开的 TCP 流中的每一个。对于第二个代码块示例,由于额外的外部,将有 22 个任务,tokio::spawn但它是不必要的并且不会增加任何并发性。

于 2020-04-24T17:28:10.040 回答