我正在使用hyper 0.12 来构建代理服务。当从上游服务器接收到响应正文时,我想尽快将其转发回客户端,并将内容保存在缓冲区中以供以后处理。
所以我需要一个函数:
- 需要 a
Stream
(hyper::Body
准确地说是 a ) - 返回
Stream
与输入流功能相同的a - 当输出流被完全消耗时,还返回某种
Future<Item = Vec<u8>, Error = ...>
通过输入流的缓冲内容解析的内容
我无法为我的生活弄清楚如何做到这一点。
我想我正在寻找的功能看起来像这样:
type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
let body2 = ... // ???
let buffer = body.fold(Vec::<u8>::new(), |mut buf, chunk| {
buf.extend_from_slice(&chunk);
// ...somehow send this chunk to body2 also?
});
(body2, buffer);
}
以下是我尝试过的,它一直有效,直到send_data()
失败(显然)。
type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
let (mut sender, body2) = hyper::Body::channel();
let consume =
body.map_err(|_| ()).fold(Vec::<u8>::new(), move |mut buf, chunk| {
buf.extend_from_slice(&chunk);
// What to do if this fails?
if sender.send_data(chunk).is_err() {}
Box::new(future::ok(buf))
});
(body2, Box::new(consume));
}
但是,有些事情告诉我我走错了路。
我发现Sink.fanout()
这似乎是我想要的,但我没有Sink
,而且我不知道如何构建一个。hyper::Body
实现Stream
但不是Sink
。