根据您的工作流程(您的数据块有多大,解密需要多长时间等),您可能对如何制作流有不同的选择。我想到的最合法的方式是使用某种带有通道的线程池在线程和您的处理程序之间进行通信。在这种情况下, Tokio 的mpsc可以是一个选项,并且它的Receiver已经实现了 Stream 并且您可以使用线程中的Sender的 try_send 从线程中提供它,考虑到您使用的是无界通道或有足够长度的有界通道,它应该工作。
如果您的解密过程不那么耗时而被视为阻塞,或者您只是想看看如何为 actix 实现 Stream,则另一种可能的选择是,这是一个示例:
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_web::{get, App, HttpResponse, HttpServer, Responder};
use pin_project::pin_project;
use sodiumoxide::crypto::secretstream::{Pull, Stream};
use tokio::{fs::File, io::AsyncRead};
#[pin_project]
struct Streamer {
crypto_stream: Stream<Pull>,
#[pin]
file: File,
}
impl tokio::stream::Stream for Streamer {
type Item = Result<actix_web::web::Bytes, actix_web::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let mut buffer = [0; BUFFER_LENGTH];
if this.crypto_stream.is_not_finalized() {
match this.file.poll_read(cx, &mut buffer) {
Poll::Ready(res) => match res {
Ok(bytes_read) if bytes_read > 0 => {
let value = this.crypto_stream.pull(&buffer, None);
match value {
Ok((decrypted, _tag)) => Poll::Ready(Some(Ok(decrypted.into()))),
Err(_) => Poll::Ready(Some(Err(
actix_web::error::ErrorInternalServerError("Incorrect password"),
))),
}
}
Err(err) => {
Poll::Ready(Some(Err(actix_web::error::ErrorInternalServerError(err))))
}
_ => Poll::Ready(Some(Err(actix_web::error::ErrorInternalServerError("Decryption error")))),
},
Poll::Pending => Poll::Pending,
}
} else {
// Stream finishes when it returns None
Poll::Ready(None)
}
}
}
并从您的处理程序中使用它:
let in_file = File::open(FILE_NAME).await?;
let stream = Stream::init_pull(&header, &key)?;
let stream = Streamer {
crypto_stream: stream,
file: in_file,
};
HttpResponse::Ok()
// .content_type("text/text")
.streaming(stream)
请注意,您需要将 pin_project 和 tokio 与 ["stream", "fs"] 作为依赖项才能正常工作。