5

我想在 Rust 中使用 actix-web 流式传输加密文件。我有一个循环使用氧化钠逐块解密加密文件。我想将块发送给客户端。

我的循环如下所示:

while stream.is_not_finalized() {
    match in_file.read(&mut buffer) {
        Ok(num_read) if num_read > 0 => {
            let (decrypted, _tag) = stream
                .pull(&buffer[..num_read], None)
                .map_err(|_| error::ErrorInternalServerError("Incorrect password"))
                .unwrap();

            // here I want to send decrypted to HttpResponse
            continue;
        }
        Err(e) => error::ErrorInternalServerError(e),
        _ => error::ErrorInternalServerError("Decryption error"), // reached EOF
    };
}

我找到了一个streaming方法,它需要 aStream作为参数。如何创建一个可以逐块添加的流?

4

1 回答 1

2

根据您的工作流程(您的数据块有多大,解密需要多长时间等),您可能对如何制作流有不同的选择。我想到的最合法的方式是使用某种带有通道的线程池在线程和您的处理程序之间进行通信。在这种情况下, Tokio 的mpsc可以是一个选项,并且它的Receiver已经实现了 Stream 并且您可以使用线程中的Sender的 try_send 从线程中提供它,考虑到您使用的是无界通道或有足够长度的有界通道,它应该工作。

如果您的解密过程不那么耗时而被视为阻塞,或者您只是想看看如何为 actix 实现 Stream,则另一种可能的选择是,这是一个示例:

use std::pin::Pin;
use std::task::{Context, Poll};

use actix_web::{get, App, HttpResponse, HttpServer, Responder};
use pin_project::pin_project;
use sodiumoxide::crypto::secretstream::{Pull, Stream};
use tokio::{fs::File, io::AsyncRead};

#[pin_project]
struct Streamer {
    crypto_stream: Stream<Pull>,
    #[pin]
    file: File,
}

impl tokio::stream::Stream for Streamer {
    type Item = Result<actix_web::web::Bytes, actix_web::Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();

        let mut buffer = [0; BUFFER_LENGTH];

        if this.crypto_stream.is_not_finalized() {
            match this.file.poll_read(cx, &mut buffer) {
                Poll::Ready(res) => match res {
                    Ok(bytes_read) if bytes_read > 0 => {
                        let value = this.crypto_stream.pull(&buffer, None);
                        match value {
                            Ok((decrypted, _tag)) => Poll::Ready(Some(Ok(decrypted.into()))),
                            Err(_) => Poll::Ready(Some(Err(
                                actix_web::error::ErrorInternalServerError("Incorrect password"),
                            ))),
                        }
                    }
                    Err(err) => {
                        Poll::Ready(Some(Err(actix_web::error::ErrorInternalServerError(err))))
                    }
                    _ => Poll::Ready(Some(Err(actix_web::error::ErrorInternalServerError("Decryption error")))),
                },
                Poll::Pending => Poll::Pending,
            }
        } else {
            // Stream finishes when it returns None
            Poll::Ready(None)
        }
    }
}

并从您的处理程序中使用它:

let in_file = File::open(FILE_NAME).await?;
let stream = Stream::init_pull(&header, &key)?;

let stream = Streamer {
    crypto_stream: stream,
    file: in_file,
};
HttpResponse::Ok()
//    .content_type("text/text")
    .streaming(stream)

请注意,您需要将 pin_project 和 tokio 与 ["stream", "fs"] 作为依赖项才能正常工作。

于 2020-10-22T16:26:54.420 回答