5

我正在使用hyper 0.12 来构建代理服务。当从上游服务器接收到响应正文时,我想尽快将其转发回客户端,并将内容保存在缓冲区中以供以后处理。

所以我需要一个函数:

  • 需要 a Streamhyper::Body准确地说是 a )
  • 返回Stream与输入流功能相同的a
  • 当输出流被完全消耗时,还返回某种Future<Item = Vec<u8>, Error = ...>通过输入流的缓冲内容解析的内容

我无法为我的生活弄清楚如何做到这一点。

我想我正在寻找的功能看起来像这样:

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let body2 = ... // ???
    let buffer = body.fold(Vec::<u8>::new(), |mut buf, chunk| {
        buf.extend_from_slice(&chunk);
        // ...somehow send this chunk to body2 also?
    });
    (body2, buffer);
}

以下是我尝试过的,它一直有效,直到send_data()失败(显然)。

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let (mut sender, body2) = hyper::Body::channel();
    let consume =
        body.map_err(|_| ()).fold(Vec::<u8>::new(), move |mut buf, chunk| {
            buf.extend_from_slice(&chunk);

            // What to do if this fails?
            if sender.send_data(chunk).is_err() {}
            Box::new(future::ok(buf))
        });

    (body2, Box::new(consume));
}

但是,有些事情告诉我我走错了路。

我发现Sink.fanout()这似乎是我想要的,但我没有Sink,而且我不知道如何构建一个。hyper::Body实现Stream但不是Sink

4

1 回答 1

5

我最终做的是实现一种新类型的流来满足我的需要。这似乎是必要的,因为hyper::Body没有实现Sink也没有hyper::Chunk实现Clone(这是必需的Sink.fanout()),所以我不能使用任何现有的组合器。

首先是一个结构,其中包含我们需要的所有详细信息和附加新块的方法,以及通知缓冲区已完成。

struct BodyClone<T> {
    body: T,
    buffer: Option<Vec<u8>>,
    sender: Option<futures::sync::oneshot::Sender<Vec<u8>>>,
}

impl BodyClone<hyper::Body> {
    fn flush(&mut self) {
        if let (Some(buffer), Some(sender)) = (self.buffer.take(), self.sender.take()) {
            if sender.send(buffer).is_err() {}
        }
    }

    fn push(&mut self, chunk: &hyper::Chunk) {
        use hyper::body::Payload;

        let length = if let Some(buffer) = self.buffer.as_mut() {
            buffer.extend_from_slice(chunk);
            buffer.len() as u64
        } else {
            0
        };

        if let Some(content_length) = self.body.content_length() {
            if length >= content_length {
                self.flush();
            }
        }
    }
}

然后我实现了Stream这个结构的特征。

impl Stream for BodyClone<hyper::Body> {
    type Item = hyper::Chunk;
    type Error = hyper::Error;

    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
        match self.body.poll() {
            Ok(Async::Ready(Some(chunk))) => {
                self.push(&chunk);
                Ok(Async::Ready(Some(chunk)))
            }
            Ok(Async::Ready(None)) => {
                self.flush();
                Ok(Async::Ready(None))
            }
            other => other,
        }
    }
}

最后我可以定义一个扩展方法hyper::Body

pub type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()> + Send>;

trait CloneBody {
    fn clone_body(self) -> (hyper::Body, BufferFuture);
}

impl CloneBody for hyper::Body {
    fn clone_body(self) -> (hyper::Body, BufferFuture) {
        let (sender, receiver) = futures::sync::oneshot::channel();

        let cloning_stream = BodyClone {
            body: self,
            buffer: Some(Vec::new()),
            sender: Some(sender),
        };

        (
            hyper::Body::wrap_stream(cloning_stream),
            Box::new(receiver.map_err(|_| ())),
        )
    }
}

这可以按如下方式使用:

let (body: hyper::Body, buffer: BufferFuture) = body.clone_body();
于 2018-08-08T11:21:23.373 回答