1

我在 Iron 处理程序中发出客户请求。如何重复使用 TokioCore和 Hyper 的Client?我正在使用超 0.11.0 和 tokio-core 0.1。

fn get_result(req: &mut Request) -> IronResult<Response> {
    let mut payload = String::new();
    req.body.read_to_string(&mut payload).unwrap();

    // can we re-use core and client somehow. Making then global with lazy_static!() does not work.
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let client = Client::new(&core.handle());

    let uri = "http://host:port/getResult".parse().unwrap();
    let mut req: hyper::Request = hyper::Request::new(hyper::Method::Post, uri);
    req.headers_mut().set(ContentType::json());
    req.headers_mut().set(ContentLength(payload.len() as u64));
    req.set_body(payload);

    let mut results: Vec<RequestFormat> = Vec::new();
    let work = client.request(req).and_then(|res| {
        res.body().for_each(|chunk| {
            let re: ResultFormat = serde_json::from_slice(&chunk).unwrap();
            results.push(re);
            Ok(())
        })
    });

    Ok(Response::with(
        (iron::status::Ok, serde_json::to_string(&results).unwrap()),
    ))
}
4

1 回答 1

1

我创建了一个包装客户端和核心的下载器类。下面是片段。

use hyper;
use tokio_core;
use std::sync::{mpsc};
use std::thread;
use futures::Future;
use futures::stream::Stream;
use std::time::Duration;
use std::io::{self, Write};
use time::precise_time_ns;
use hyper::Client;

pub struct Downloader {
    sender : mpsc::Sender<(hyper::Request, mpsc::Sender<hyper::Chunk>)>,
    #[allow(dead_code)]
    tr : thread::JoinHandle<hyper::Request>,
}
impl Downloader {
    pub fn new() -> Downloader {
        let (sender, receiver) = mpsc::channel::<(hyper::Request,mpsc::Sender<hyper::Chunk>)>();
        let tr = thread::spawn(move||{
            let mut core = tokio_core::reactor::Core::new().unwrap();
            let client =  Client::new(&core.handle());
            loop {
                let (req , sender) = receiver.recv().unwrap();
                let begin = precise_time_ns();
                let work = client.request(req)   
                .and_then(|res| {
                    res.body().for_each(|chunk| {

                        sender.send(chunk)
                        .map_err(|e|{
                            //io::sink().write(&chunk).unwrap();
                            io::Error::new(io::ErrorKind::Other, e)
                        })?;
                        Ok(())
                    })
                    //sender.close();
                //res.body().concat2()
                });
            core.run(work).map_err(|e|{println!("Error Is {:?}", e);});
           //This time prints same as all request processing time. 
            debug!("Time taken In Download {:?} ms", (precise_time_ns() - begin) / 1000000);
            }
        });
        Downloader{sender,
                tr,
        }
    }

    pub fn download(&self, req : hyper::Request, results:  mpsc::Sender<Vec<u8>>){
        self.sender.send((req, results)).unwrap();
    }
}

现在这个类的客户端可以有一个静态变量。

lazy_static!{
    static ref DOWNLOADER : Mutex<downloader::Downloader> = 
Mutex::new(downloader::Downloader::new());
}
let (sender, receiver) = mpsc::channel();
DOWNLOADER.lock().unwrap().download(payload, sender);

然后通过接收通道读取。可能需要使用 sender.drop() 关闭发送方通道

于 2017-07-17T17:14:58.437 回答