我遇到了一个错误,我的 Hyper HTTP 响应被截断为特定大小(7829 字节)。使用 cURL 发出相同的请求可以正常工作。
该请求向 JSON 端点查询数据。然后响应结构被洗牌很多,因为使用了一个相对复杂的速率限制过程来一次发出许多这样的请求。但是,如果只发出一个请求,响应仍然会被截断。
在实施速率限制和进行一些重度重构之前,程序正确地做出了这些响应。
我在下面做了一个最小的例子,但它无法重现问题。在这一点上,我不知道在哪里看。代码库相当复杂,迭代扩展复制示例很困难,尤其是当我不知道可能导致这种情况的时候。
Hyper 的响应正文可能会被截断哪些方式?响应正文通过handle
以下函数获取。
#![feature(use_nested_groups)]
extern crate futures;
extern crate hyper;
extern crate hyper_tls;
extern crate tokio_core;
use futures::{Future, Stream};
use hyper::{Body, Chunk, Client, Method, Request, Response};
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use std::env;
fn main() {
let mut core = Core::new().unwrap();
let client = Client::configure()
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
.build(&core.handle());
fn handle(response: Response<Body>) -> Box<Future<Item = usize, Error = hyper::Error>> {
Box::new(
response
.body()
.concat2()
.map(move |body: Chunk| -> usize { body.len() }),
)
}
let args: Vec<String> = env::args().collect();
let uri = &args[1];
let req = Request::new(Method::Get, uri.parse().unwrap());
let response_body_length = {
let future = Box::new(client.request(req).map(handle).flatten());
core.run(future).unwrap()
};
println!("response body length: {}", response_body_length);
}
违规代码:
extern crate serde;
extern crate serde_json;
use futures::{future, stream, Future, Stream};
use hyper;
use hyper::{client, Body, Chunk, Client, Headers, Method, Request, Response, header::Accept,
header::Date as DateHeader, header::RetryAfter};
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use models::Bucket;
use std::thread;
use std::time::{Duration, UNIX_EPOCH};
use std::str;
header! { (XRateLimitRemaining, "x-ratelimit-remaining") => [String] }
#[derive(Debug)]
struct Uri(pub String);
const MAX_REQ_SIZE: u32 = 500;
fn make_uri(symbol: &str, page_ix: u32) -> Uri {
Uri(format!(
"https://www.bitmex.com/api/v1/trade/bucketed?\
symbol={symbol}&\
columns={columns}&\
partial=false&\
reverse=true&\
binSize={bin_size}&\
count={count}&\
start={start}",
symbol = symbol,
columns = "close,timestamp",
bin_size = "5m",
count = MAX_REQ_SIZE,
start = 0 + MAX_REQ_SIZE * page_ix
))
}
#[derive(Debug)]
struct RateLimitInfo {
remaining_reqs: u32,
retry_after: Option<Duration>,
}
impl RateLimitInfo {
fn default() -> RateLimitInfo {
RateLimitInfo {
remaining_reqs: 1,
retry_after: None,
}
}
fn from<T>(resp: &Response<T>) -> RateLimitInfo {
let headers = resp.headers();
let remaining_reqs = headers
.get::<XRateLimitRemaining>()
.unwrap_or_else(|| panic!("x-ratelimit-remaining not on request."))
.parse()
.unwrap();
let retry_after = match headers.get::<RetryAfter>() {
Some(RetryAfter::Delay(duration)) => Some(*duration),
_ => None,
};
RateLimitInfo {
remaining_reqs,
retry_after,
}
}
}
fn resp_dated_later<'a>(a: &'a Response<Body>, b: &'a Response<Body>) -> &'a Response<Body> {
let get_date = |resp: &Response<Body>| {
let headers: &Headers = resp.headers();
**headers.get::<DateHeader>().unwrap()
};
if get_date(&a) > get_date(&b) {
a
} else {
b
}
}
#[derive(Debug)]
struct Query {
uri: Uri,
response: Option<Response<Body>>,
}
impl Query {
fn from_uri(uri: Uri) -> Query {
Query {
uri: uri,
response: None,
}
}
}
fn query_good(q: &Query) -> bool {
match &q.response {
Some(response) => response.status().is_success(),
_ => false,
}
}
type HttpsClient = hyper::Client<HttpsConnector<client::HttpConnector>>;
type FutureQuery = Box<Future<Item = Query, Error = hyper::Error>>;
fn to_future(x: Query) -> FutureQuery {
Box::new(future::ok(x))
}
fn exec_if_needed(client: &HttpsClient, query: Query) -> FutureQuery {
fn exec(client: &HttpsClient, q: Query) -> FutureQuery {
println!("exec: {:?}", q);
let uri = q.uri;
let req = {
let mut req = Request::new(Method::Get, uri.0.parse().unwrap());
req.headers_mut().set(Accept::json());
req
};
Box::new(
client
.request(req)
.inspect(|resp| println!("HTTP {}", resp.status()))
.map(|resp| Query {
uri: uri,
response: Some(resp),
}),
)
}
if query_good(&query) {
to_future(query)
} else {
exec(client, query)
}
}
type BoxedFuture<T> = Box<Future<Item = T, Error = hyper::Error>>;
fn do_batch(client: &HttpsClient, queries: Vec<Query>) -> BoxedFuture<Vec<Query>> {
println!("do_batch() {} queries", queries.len());
let exec_if_needed = |q| exec_if_needed(client, q);
let futures = queries.into_iter().map(exec_if_needed);
println!("do_batch() futures {:?}", futures);
Box::new(
stream::futures_ordered(futures).collect(), //future::join_all(futures)
)
}
fn take<T>(right: &mut Vec<T>, suggested_n: usize) -> Vec<T> {
let n: usize = if right.len() < suggested_n {
right.len()
} else {
suggested_n
};
let left = right.drain(0..n);
return left.collect();
}
type BoxedResponses = Box<Vec<Response<Body>>>;
fn batched_throttle(uris: Vec<Uri>) -> BoxedResponses {
println!("batched_throttle({} uris)", uris.len());
let mut core = Core::new().unwrap();
let client = Client::configure()
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
.build(&core.handle());
let mut rate_limit_info = RateLimitInfo::default();
let mut queries_right: Vec<Query> = uris.into_iter().map(Query::from_uri).collect();
loop {
let mut queries_left: Vec<Query> = Vec::with_capacity(queries_right.len());
println!("batched_throttle: starting inner loop");
loop {
// throttle program during testing
thread::sleep(Duration::from_millis(800));
println!("batched_throttle: {:?}", rate_limit_info);
if let Some(retry_after) = rate_limit_info.retry_after {
println!("batched_throttle: retrying after {:?}", retry_after);
thread::sleep(retry_after)
}
if queries_right.is_empty() {
break;
}
let mut queries_mid = {
let ri_count = rate_limit_info.remaining_reqs;
let iter_req_count = if ri_count == 0 { 1 } else { ri_count };
println!("batched_throttle: iter_req_count {}", iter_req_count);
take(&mut queries_right, iter_req_count as usize)
};
println!(
"batched_throttle: \
queries_right.len() {}, \
queries_left.len() {}, \
queries_mid.len() {})",
queries_right.len(),
queries_left.len(),
queries_mid.len()
);
if queries_mid.iter().all(query_good) {
println!("batched_throttle: queries_mid.iter().all(query_good)");
continue;
}
queries_mid = { core.run(do_batch(&client, queries_mid)).unwrap() };
rate_limit_info = {
let create_very_old_response =
|| Response::new().with_header(DateHeader(UNIX_EPOCH.into()));
let very_old_response = create_very_old_response();
let last_resp = queries_mid
.iter()
.map(|q| match &q.response {
Some(r) => r,
_ => panic!("Impossible"),
})
.fold(&very_old_response, resp_dated_later);
RateLimitInfo::from(&last_resp)
};
&queries_left.append(&mut queries_mid);
}
queries_right = queries_left;
if queries_right.iter().all(query_good) {
break;
}
}
println!(
"batched_throttle: finishing. queries_right.len() {}",
queries_right.len()
);
Box::new(
queries_right
.into_iter()
.map(|q| q.response.unwrap())
.collect(),
)
}
fn bucket_count_to_req_count(bucket_count: u32) -> u32 {
let needed_req_count = (bucket_count as f32 / MAX_REQ_SIZE as f32).ceil() as u32;
return needed_req_count;
}
type BoxedBuckets = Box<Vec<Bucket>>;
fn response_to_buckets(response: Response<Body>) -> BoxedFuture<Vec<Bucket>> {
Box::new(response.body().concat2().map(|body: Chunk| -> Vec<Bucket> {
println!("body.len(): {}", body.len());
println!("JSON: {}", str::from_utf8(&body).unwrap());
serde_json::from_slice(&body).unwrap()
}))
}
pub fn get_n_last(symbol: &str, bucket_count: u32) -> BoxedBuckets {
let req_count = bucket_count_to_req_count(bucket_count);
let uris = (0..req_count)
.map(|page_ix| make_uri(symbol, page_ix))
.collect();
let responses = batched_throttle(uris);
let mut core = Core::new().unwrap();
let boxed_buckets = {
let futures = responses.into_iter().map(response_to_buckets);
let future = stream::futures_ordered(futures).collect();
let groups_of_buckets = core.run(future).unwrap();
Box::new(
groups_of_buckets
.into_iter()
.flat_map(|bs| bs.into_iter())
.rev()
.collect(),
)
};
return boxed_buckets;
}