0

如果通过 UDP 连接接收到某个值,我想创建一个完成的未来。

澄清一下,假设我发送一个 id 为 2 的 ping 并希望异步等待具有相同 id 的 pong。我的想法是使用类似的东西send_ping(endpoint, id) -> Future并与未来一起工作(例如将其传递给其他函数),知道它要么解析为匹配的 pong 要么解析为超时。

我的想法的草图:

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::*;
use futures::Stream;
use futures::sync::mpsc;

fn main() {
    let mut core = Core::new().unwrap();

    // simulate my UDP socket connection
    let (remote, socket) = mpsc::unbounded::<i32>();
    remote.unbounded_send(1).unwrap();
    remote.unbounded_send(2).unwrap();
    remote.unbounded_send(3).unwrap();

    let reader = socket.for_each(|id| {
        println!("received {}", id);
        Ok(())
    });

    // create future which completes if 2 is received
    // or after a given timeout

    core.run(reader).unwrap();
}

这甚至可能吗?我找不到任何例子。

4

1 回答 1

0

您可以使用tokio_timer::Timer::timeout_streamfutures::Stream::filter

extern crate futures;
extern crate tokio_core;
extern crate tokio_timer;

use tokio_core::reactor::*;
use futures::Stream;
use futures::sync::mpsc;
use std::time::Duration;

fn main() {
    let mut core = Core::new().unwrap();
    let timer = tokio_timer::Timer::default();

    // simulate my UDP socket connection
    let (remote, socket) = mpsc::unbounded::<i32>();
    remote.unbounded_send(1).unwrap();
    remote.unbounded_send(2).unwrap();
    remote.unbounded_send(3).unwrap();

    let consumer = timer
        .timeout_stream(socket, Duration::from_secs(2))
        .filter(|&v| v == 2)
        .for_each(|id| {
            println!("received {}", id);
            Ok(())
        });

    println!("{:?}", core.run(consumer));
}
于 2018-02-07T16:49:31.677 回答