Tokio 在其上具有相同的简单 TCP 回显服务器示例:
- GitHub 主页 ( https://github.com/tokio-rs/tokio )
- API 参考主页 ( https://docs.rs/tokio/0.2.18/tokio/ )
但是,在这两页中,都没有解释实际发生的事情。这是示例,稍作修改,以便主函数不返回Result<(), Box<dyn std::error::Error>>
:
use tokio::net::TcpListener;
use tokio::prelude::*;
#[tokio::main]
async fn main() {
if let Ok(mut tcp_listener) = TcpListener::bind("127.0.0.1:8080").await {
while let Ok((mut tcp_stream, _socket_addr)) = tcp_listener.accept().await {
tokio::spawn(async move {
let mut buf = [0; 1024];
// In a loop, read data from the socket and write the data back.
loop {
let n = match tcp_stream.read(&mut buf).await {
// socket closed
Ok(n) if n == 0 => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return;
}
};
// Write the data back
if let Err(e) = tcp_stream.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket; err = {:?}", e);
return;
}
}
});
}
}
}
阅读 Tokio 文档 ( https://tokio.rs/docs/overview/ ) 后,这是我对这个示例的心智模型。为每个新的 TCP 连接生成一个任务。每当发生读/写错误或客户端结束连接(即n == 0
案例)时,任务就会结束。因此,如果某个时间点有 20 个连接的客户端,则将有 20 个衍生任务。然而,在底层,这并不等同于产生 20 个线程来同时处理连接的客户端。据我了解,这基本上是异步运行时试图解决的问题。到目前为止正确吗?
接下来,我的心智模型是 tokio 调度程序(例如threaded_scheduler
,应用程序默认的多线程,或测试默认的单线程basic_scheduler
)将在 1 对 N 线程上同时调度这些任务。(附带问题:对于threaded_scheduler
,N 在应用程序的生命周期内是固定的吗?如果是,它是否等于num_cpus::get()
?)。如果一个任务正在.await
执行read
orwrite_all
操作,那么调度程序可以使用同一线程为其他 19 个任务之一执行更多工作。还是正确的?
最后,我很好奇外部代码(即.await
ing for的代码tcp_listener.accept()
)本身是否是一项任务?这样,在 20 个连接的客户端示例中,实际上并不是 20 个任务,而是 21 个:一个用于侦听新连接 + 每个连接一个。所有这 21 个任务都可以在一个或多个线程上同时调度,具体取决于调度程序。在下面的示例中,我将外部代码包装在 atokio::spawn
和.await
句柄中。是否完全等同于上面的例子?
use tokio::net::TcpListener;
use tokio::prelude::*;
#[tokio::main]
async fn main() {
let main_task_handle = tokio::spawn(async move {
if let Ok(mut tcp_listener) = TcpListener::bind("127.0.0.1:8080").await {
while let Ok((mut tcp_stream, _socket_addr)) = tcp_listener.accept().await {
tokio::spawn(async move {
// ... same as above ...
});
}
}
});
main_task_handle.await.unwrap();
}