我正在尝试编写一个可以从同一服务器执行获取请求的 tokio 事件循环,具有以下特征:
- 应该使用连接池
- get请求一般比较慢(>1s),所以需要并行执行
- 服务器可能没有响应,所以我需要超时。如果未收到请求,请再次发送
- 轮询接收器以获取必须下载的新 URL。它们应该被添加到事件循环中
到目前为止,在我的尝试中,我已经设法让这 4 个项目的不同组合起作用,但从来没有一起工作。我的主要问题是我不太明白如何向 tokio 事件循环添加新的期货。
我假设我需要使用loop_fn
轮询接收器的主循环并handle.spawn
生成新任务?handle.spawn
只允许未来Result<(),()>
,所以我不能使用它的输出在失败时重新生成作业,所以我需要将重试检查移到那个未来?
以下是批量接受和处理 url 的尝试(因此没有连续轮询),并且有超时(但没有重试):
fn place_dls(&mut self, reqs: Vec<String>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let timeout = Timeout::new(Duration::from_millis(5000), &handle).unwrap();
let send_dls = stream::iter_ok::<_, reqwest::Error>(reqs.iter().map(|o| {
// send with request through an async reqwest client in self
}));
let rec_dls = send_dls.buffer_unordered(dls.len()).for_each(|n| {
n.into_body().concat2().and_then(|full_body| {
debug!("Received: {:#?}", full_body);
// TODO: how to put the download back in the queue if failure code is received?
})
});
let work = rec_dls.select2(timeout).then(|res| match res {
Ok(Either::A((got, _timeout))) => {
Ok(got)
},
Ok(Either::B((_timeout_error, _get))) => {
// TODO: put back in queue
Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client timed out while connecting",
).into())
}
Err(Either::A((get_error, _timeout))) => Err(get_error.into()),
Err(Either::B((timeout_error, _get))) => Err(timeout_error.into()),
});
core.run(work);
}
loop_fn
可悲的是,我的尝试失败了。