我正在尝试构建一个从 SFTP 服务器中提取文件并将它们上传到 S3 的服务。
对于 SFTP 部分,我使用的是async-ssh2,它为我提供了一个实现futures::AsyncRead. 由于这些 SFTP 文件可能非常大,我正在尝试将此File处理程序转换为ByteStream可以使用 Rusoto 上传的文件。看起来 aByteStream可以用 a 初始化futures::Stream。
我的计划是Stream在File对象上实现(基于此处的代码)以与 Rusoto 兼容(代码在下面复制以供后代使用):
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
pub struct ByteStream<R>(R);
impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
type Item = u8;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut buf = [0; 1];
match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
Ok(n) if n != 0 => Some(buf[0]).into(),
_ => None.into(),
}
}
}
这是做这件事的好方法吗?我看到了这个问题,但它似乎正在使用tokio::io::AsyncRead. 是否使用tokio规范的方式来执行此操作?如果是这样,有没有办法从转换futures_io::AsyncRead为tokio::io::AsyncRead?