3

我正在尝试构建一个从 SFTP 服务器中提取文件并将它们上传到 S3 的服务。

对于 SFTP 部分,我使用的是async-ssh2,它为我提供了一个实现futures::AsyncRead. 由于这些 SFTP 文件可能非常大,我正在尝试将此File处理程序转换为ByteStream可以使用 Rusoto 上传的文件。看起来 aByteStream可以用 a 初始化futures::Stream

我的计划是StreamFile对象上实现(基于此处的代码)以与 Rusoto 兼容(代码在下面复制以供后代使用):

use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};

pub struct ByteStream<R>(R);

impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = u8;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = [0; 1];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
            Ok(n) if n != 0 => Some(buf[0]).into(),
            _ => None.into(),
        }
    }
}

这是做这件事的好方法吗?我看到了这个问题,但它似乎正在使用tokio::io::AsyncRead. 是否使用tokio规范的方式来执行此操作?如果是这样,有没有办法从转换futures_io::AsyncReadtokio::io::AsyncRead

4

1 回答 1

1

这就是我进行转换的方式。我基于上面的代码,除了我使用更大的缓冲区(8 KB)来减少网络调用的数量。

use bytes::Bytes;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
use futures_io::AsyncRead;
use rusoto_s3::StreamingBody;

const KB: usize = 1024;

struct ByteStream<R>(R);

impl<R: AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = Result<Bytes, std::io::Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = vec![0_u8; 8 * KB];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf[..])) {
            Ok(n) if n != 0 => Some(Ok(Bytes::from(buf))).into(),
            Ok(_) => None.into(),
            Err(e) => Some(Err(e)).into(),
        }
    }
}

允许我这样做:

fn to_streamingbody(body: async_ssh2::File) -> Option<StreamingBody> {
    let stream = ByteStream(body);
    Some(StreamingBody::new(stream))
}

(注意rusoto::StreamingBodyrusoto::ByteStream是别名)

于 2020-06-10T21:50:00.073 回答