6

我试图interval_timer在清空队列后取消间隔(),但不确定什么是正确的策略。

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

drop按照 gitter 中的建议进行了尝试,但最终出现错误:

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    if some_vars.len() == 1 {
        drop(interval_timer);
    }

    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

错误:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:72:22
   |
60 |     let mut interval_timer = tokio_timer::Timer::default();
   |         ------------------ captured outer variable
...
72 |                 drop(interval_timer);
   |                      ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
4

3 回答 3

3

对于要从​​流外部取消流的情况,请参阅stream-cancel


对于您的特定情况,将您的集合转换为流并将其与间隔计时器一起压缩是最简单的。这样,当集合为空时,结果流自然会停止:

use futures::{future, stream, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let some_vars = stream::iter_ok(some_vars.into_iter().rev());
        let combined = timer.zip(some_vars);

        combined.for_each(move |(_, item)| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}

and_then否则,您可以通过使用从集合中删除值并控制流是否应该继续来停止流:

use futures::{future, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let limited = timer.and_then(move |_| {
            if some_vars.len() <= 4 {
                Err(())
            } else {
                some_vars.pop().ok_or(())
            }
        });

        limited.for_each(move |item| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}
于 2019-11-05T15:49:16.827 回答
0

我创建了 Tokio 结构的副本Interval,添加了对我的应用程序方法的引用以指示何时提前中断。

就我而言,我想中断Interval关机。

我的间隔轮询方法如下所示:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
    if self.session.read().unwrap().shutdown {
        return Ok(Async::Ready(Some(Instant::now())));
    }

    // Wait for the delay to be done
    let _ = match self.delay.poll() {

然后你需要保持对任务的处理(task = futures::task::current()在超时任务内运行时调用)。

然后,您可以在任何时候调用task.notify()以启动间隔并点击您的突破代码,打断Interval早期。

里面Interval有一个Delay可以修改的结构,您可以创建一个Interval可以中断并更改超时的结构,这样您可以中断一次然后继续。

于 2019-11-04T21:25:03.673 回答
-2

tokio_timer::Intervalimplements futures::Stream,所以尝试使用该take_while方法:

let s = timer
    .take_while(|()| 
        future::ok(is_net_completed()))
    .for_each(move |_| {
        println!("Woke up");
        // ...
    })
于 2018-03-13T12:18:04.220 回答