-1

我试图每隔 N 秒从一个无界队列中提取消息(它们本身就是期货)并将它们生成到 Tokio 处理程序中。

我尝试了几十种变体,但似乎找不到正确的方法。看起来应该是可能的,但我总是遇到未来的类型不匹配或最终出现借用问题。

这是或多或少显示我想要的代码:

let fut = Interval::new_interval(Duration::from_secs(1))
        .for_each(|num| vantage_dequeuer.into_future() )
        .for_each(|message:VantageMessage |{
            handle.spawn(message);
            return Ok(());
        })
        .map_err(|e| panic!("delay errored; err={:?}", e));

core.run(fut);

完整代码:

extern crate futures; // 0.1.24
extern crate tokio; // 0.1.8
extern crate tokio_core; // 0.1.17

use futures::future::ok;
use futures::sync::mpsc;
use futures::{Future, Stream};
use std::thread;
use std::time::Duration;
use tokio::timer::Interval;
use tokio_core::reactor::Core;

type VantageMessage = Box<Future<Item = (), Error = ()> + Send>;

fn main() {
    let (enqueuer, dequeuer) = mpsc::unbounded();
    let new_fut: VantageMessage = Box::new(ok(()).and_then(|_| {
        println!("Message!");
        return Ok(());
    }));
    enqueuer.unbounded_send(new_fut);
    let joinHandle = worker(Some(dequeuer));
    joinHandle.join();
}

/*
    Every second extract one message from dequeuer (or wait if not available)
    and spawn it in the core
*/
fn worker(
    mut vantage_dequeuer: Option<mpsc::UnboundedReceiver<VantageMessage>>,
) -> thread::JoinHandle<()> {
    let dequeuer = dequeuer.take().unwrap();
    let joinHandle = thread::spawn(|| {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let fut = Interval::new_interval(Duration::from_secs(1))
            .for_each(|num| vantage_dequeuer.into_future())
            .for_each(|message: VantageMessage| {
                handle.spawn(message);
                return Ok(());
            })
            .map_err(|e| panic!("delay errored; err={:?}", e));

        core.run(fut);
        println!("Returned!");
    });
    return joinHandle;
}

操场

error[E0425]: cannot find value `dequeuer` in this scope
  --> src/main.rs:33:20
   |
33 |     let dequeuer = dequeuer.take().unwrap();
   |                    ^^^^^^^^ not found in this scope

error[E0599]: no method named `into_future` found for type `std::option::Option<futures::sync::mpsc::UnboundedReceiver<std::boxed::Box<(dyn futures::Future<Item=(), Error=()> + std::marker::Send + 'static)>>>` in the current scope
  --> src/main.rs:38:46
   |
38 |             .for_each(|num| vantage_dequeuer.into_future())
   |                                              ^^^^^^^^^^^
   |
   = note: the method `into_future` exists but the following trait bounds were not satisfied:
           `&mut std::option::Option<futures::sync::mpsc::UnboundedReceiver<std::boxed::Box<(dyn futures::Future<Item=(), Error=()> + std::marker::Send + 'static)>>> : futures::Stream`
4

1 回答 1

1

Interval并且UnboundedReceiver都是流,所以我会用Stream::zip它们来组合它们:

压缩流等待两个流生成一个项目,然后返回该对。如果发生错误,则该错误将立即返回。如果任一流结束,则压缩流也将结束。

extern crate futures; // 0.1.24
extern crate tokio;   // 0.1.8
extern crate tokio_core; // 0.1.17

use futures::{
    future::ok,
    sync::mpsc,
    {Future, Stream},
};
use std::{thread, time::Duration};
use tokio::timer::Interval;
use tokio_core::reactor::Core;

type VantageMessage = Box<Future<Item = (), Error = ()> + Send>;

pub fn main() {
    let (tx, rx) = mpsc::unbounded();

    let new_fut: VantageMessage = Box::new(ok(()).and_then(|_| {
        println!("Message!");
        Ok(())
    }));
    tx.unbounded_send(new_fut).expect("Unable to send");
    drop(tx); // Close the sending side

    worker(rx).join().expect("Thread had a panic");
}

fn worker(queue: mpsc::UnboundedReceiver<VantageMessage>) -> thread::JoinHandle<()> {
    thread::spawn(|| {
        let mut core = Core::new().unwrap();
        let handle = core.handle();

        core.run({
            Interval::new_interval(Duration::from_secs(1))
                .map_err(|e| panic!("delay errored; err={}", e))
                .zip(queue)
                .for_each(|(_, message)| {
                    handle.spawn(message);
                    Ok(())
                })
        })
        .expect("Unable to run reactor");
        println!("Returned!");
    })
}

请注意,这实际上并没有等待任何生成的期货在反应堆关闭之前完成。如果你想要,我会切换到tokio::runand tokio::spawn

fn worker(queue: mpsc::UnboundedReceiver<VantageMessage>) -> thread::JoinHandle<()> {
    thread::spawn(|| {
        tokio::run({
            Interval::new_interval(Duration::from_secs(1))
                .map_err(|e| panic!("delay errored; err={}", e))
                .zip(queue)
                .for_each(|(_, message)| {
                    tokio::spawn(message);
                    Ok(())
                })
        });
        println!("Returned!");
    })
}
于 2018-10-12T14:34:24.267 回答