8

我有一个AsyncRead并且想Stream<Item = tokio::io::Result<Bytes>>用 tokio 0.2 和 futures 0.3 将它转换为一个。

我能做的最好的事情是:

use bytes::Bytes; // 0.4.12
use futures::stream::{Stream, TryStreamExt};; // 0.3.1
use tokio::{fs::File, io::Result}; // 0.2.4
use tokio_util::{BytesCodec, FramedRead}; // 0.2.0

#[tokio::main]
async fn main() -> Result<()> {
    let file = File::open("some_file.txt").await?;
    let stream = FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
    fn_that_takes_stream(stream)
}

fn fn_that_takes_stream<S, O>(s: S) -> Result<()>
where
    S: Stream<Item = Result<Bytes>>,
{
    //...
    Ok(())
}

似乎应该有一种更简单的方法;我很惊讶 Tokio 没有包含一个编解码器来获取流Bytes而不是,BytesMut或者不只是一个扩展特征提供了一种将 anAsyncRead转换为Stream. 我错过了什么吗?

4

1 回答 1

1

如果您可以使用 tokio 1.0 或 0.3,则 tokio-util 现在tokio_util::io::ReaderStream从版本 0.4 开始。

let file = File::open("some_file.txt").await?;
let stream = tokio_util::io::ReaderStream::new(file);
fn_that_takes_stream(stream)
于 2021-11-15T22:53:35.147 回答