2

我有一些目前看起来像这样的 Rust 代码

fn read_stdin(mut tx: mpsc::Sender<String>) {
    loop {
        // read from stdin and send value over tx.
    }
}

fn sleep_for(n: u64) -> impl Future<Item = (), Error = ()> {
    thread::sleep(time::Duration::from_millis(n));
    println!("[{}] slept for {} ms", Local::now().format("%T%.3f"), n);
    future::ok(())
}

fn main() {
    let (stdin_tx, stdin_rx) = mpsc::channel(0);
    thread::spawn(move || read_stdin(stdin_tx));

    let server = stdin_rx
        .map(|data| data.trim().parse::<u64>().unwrap_or(0))
        .for_each(|n| tokio::spawn(sleep_for(n * 100)));
    tokio::run(server);
}

它使用 tokio 和 futures,目的是运行一些“cpu 繁重”工作(由sleep_for函数模拟),然后将一些东西输出到stdout.

当我运行它时,一切似乎都很好,我得到了这个输出

2
[00:00:00.800] slept for 200 ms
10
1
[00:00:01.800] slept for 1000 ms
[00:00:01.900] slept for 100 ms

带有该值的第一个输出2完全符合预期,我看到 200 毫秒后打印的时间戳。但是对于下一个输入,很明显该sleep_for函数是按顺序执行的,而不是同时执行的。

我想看到的输出是

2
[00:00:00.800] slept for 200 ms
10
1
[00:00:00.900] slept for 100 ms
[00:00:01.900] slept for 1000 ms

似乎要获得我正在寻找的输出,我想同时sleep_for(10)执行sleep_for(1)。我将如何使用期货和 tokio 在 Rust 中执行此操作?

(注意:时间戳的实际值并不重要,我更多地使用它们来显示程序中的执行顺序)

4

1 回答 1

0

找到了使用futures-timer板条箱的解决方案。

use chrono::Local;
use futures::{future, sync::mpsc, Future, Sink, Stream};
use futures_timer::Delay;
use std::{io::stdin, thread, time::Duration};

fn read_stdin(mut tx: mpsc::Sender<String>) {
    let stdin = stdin();
    loop {
        let mut buf = String::new();
        stdin.read_line(&mut buf).unwrap();
        tx = tx.send(buf).wait().unwrap()
    }
}

fn main() {
    let (stdin_tx, stdin_rx) = mpsc::channel(0);
    thread::spawn(move || read_stdin(stdin_tx));

    let server = stdin_rx
        .map(|data| data.trim().parse::<u64>().unwrap_or(0) * 100)
        .for_each(|delay| {
            println!("[{}] {} ms -> start", Local::now().format("%T%.3f"), delay);
            tokio::spawn({
                Delay::new(Duration::from_millis(delay))
                    .and_then(move |_| {
                        println!("[{}] {} ms -> done", Local::now().format("%T%.3f"), delay);
                        future::ok(())
                    })
                    .map_err(|e| panic!(e))
            })
        });

    tokio::run(server);
}

问题在于,宁愿让未来停止,然后通知当前任务,问题中提供的代码只是让线程休眠,因此无法取得进展。

更新:现在我刚刚遇到tokio-timer这似乎是这样做的标准方式。

于 2018-11-18T09:48:16.227 回答