4

我正在创建一个将产生其他任务的任务。其中一些需要一些时间,因此无法等待,但可以并行运行:

src/main.rs

use crossbeam::crossbeam_channel::{bounded, select};

#[tokio::main]
async fn main() {
    let (s, r) = bounded::<usize>(1);

    tokio::spawn(async move {
        let mut counter = 0;
        loop {
            let loop_id = counter.clone();
            tokio::spawn(async move { // why this one was not fired?
                println!("inner task {}", loop_id);
            }); // .await.unwrap(); - solves issue, but this is long task which cannot be awaited
            println!("loop {}", loop_id);
            select! {
                recv(r) -> rr => {
                    // match rr {
                    //     Ok(ee) => {
                    //         println!("received from channel {}", loop_id);
                    //         tokio::spawn(async move {
                    //             println!("received from channel task {}", loop_id);
                    //         });
                    //     },
                    //     Err(e) => println!("{}", e),
                    // };
                },
                // more recv(some_channel) -> 
            }
            counter = counter + 1;
        }
    });

    // let s_clone = s.clone();
    // tokio::spawn(async move {
    //     s_clone.send(2).unwrap();
    // });

    loop {
        // rest of the program
    }
}

我注意到奇怪的行为。这输出:

loop 0

我期待它也能输出inner task 0

如果我向通道发送一个值,输出将是:

loop 0
inner task 0
loop 1

这是缺失的inner task 1

为什么会inner task产生一个延迟循环?

我第一次注意到“从通道任务接收”的这种行为延迟了一个循环,但是当我减少代码以准备样本时,这种情况开始发生在“内部任务”中。值得一提的是,如果我写第二个tokio::spawn权利给另一个,只有最后一个会有这个问题。tokio::spawn打电话时我应该注意什么select!?是什么导致这一循环延迟?

Cargo.toml 依赖项

[dependencies]
tokio = { version = "0.2", features = ["full"] }
crossbeam = "0.7"

生锈 1.46,Windows 10

4

1 回答 1

3

select!正在阻塞,并且文档tokio::spawn

生成的任务可以在当前线程上执行,也可以发送到不同的线程执行。

在这种情况下,select!“未来”实际上是一个阻塞函数,并且spawn不使用新线程(在第一次调用或循环内)。因为你没有告诉 tokio 你要阻塞,所以 tokio 认为不需要另一个线程(从 tokio 的角度来看,你只有 3 个永远不应该阻塞的期货,所以你为什么还需要另一个线程呢?)。

解决方案是使用tokio::task::spawn_blockingfor the select!-ing 闭包(这将不再是未来,async move {}现在也是move || {})。现在 tokio 会知道这个函数实际上是阻塞的,并将它移动到另一个线程(同时将所有实际的未来保留在其他执行线程中)。

use crossbeam::crossbeam_channel::{bounded, select};

#[tokio::main]
async fn main() {
    let (s, r) = bounded::<usize>(1);

    tokio::task::spawn_blocking(move || {
        // ...
    });

    loop {
        // rest of the program
    }
}

链接到游乐场

另一种可能的解决方案是使用像这样的非阻塞通道tokio::sync::mpsc,您可以在该通道上使用await并获得预期的行为,例如使用直接或使用的游乐场示例,如下所示:recv().awaittokio::select!

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut s, mut r) = mpsc::channel::<usize>(1);

    tokio::spawn(async move {
        loop {
            // ...
            tokio::select! {
                Some(i) = r.recv() => {
                    println!("got = {}", i);
                }
            }
        }
    });

    loop {
        // rest of the program
    }
}

链接到游乐场

于 2020-09-30T09:27:45.263 回答