2

我认为这.for_each()可以解决问题,但它只返回通道中的第一个(未来)项目,然后返回,如果通道是空的。.for_each()像在非 Tokio/future 上下文中那样无限期地读取任务中的频道的方法是什么?

let tx_origs_reader = rx_chan.for_each(move |tx_orig| {
    //save receiver side tx to db
    let mut tx_origs_once = tx_origs_inner.borrow_mut();
    tx_origs_once.push(tx_orig.clone());  
    Ok(())
});
handle.spawn(tx_origs_reader.then(|err| {
    println!("This returns after first item without an error {:?}", err);
    Ok(())
}));
4

1 回答 1

1

处理方式是最好的for_each方式,应该有效——而且有效!在 Gitter 中的 tokio-rs 人员的帮助下(谢谢!)使用简单的测试代码,它被调试为 tx 端的问题。

在我看来,Rust 是如此先进,以至于它实际上知道在这种情况下放弃任务:基于此的日志输出让我感到实际上认为问题出在 rx 方面,尽管它一直是 tx 方面的问题。

于 2017-04-19T15:44:51.890 回答