我正在使用 warp 和 tokio 使我的网络服务器生锈。这就是我正在做的事情:
- 我正在创建三个 tokio 运行时,并在它们上执行 3 个异步函数,所有这些函数都使用通道相互通信。我这样做是因为我正在制作一个用于模型推理的网络服务器,每个部分负责一件事(预处理和批处理,从 tf 模型推断,以及服务器本身)。
- 每个请求的响应处理程序通过 mpsc 发送器将它接收到的数据传递给在另一个运行时运行的函数,该发送器的一个克隆被传递给所有处理程序。它还为另一个运行时传递一个单一的 Sender 以将结果发送回响应处理程序。
这适用于中等负载,但在重负载(50 个线程,100 个循环)下,响应处理程序中的 oneshot 接收器似乎被丢弃,服务器无法返回结果。
我在下面附上了一个最小的可重现示例:
#![allow(non_snake_case)]
#[macro_use]
extern crate log;
extern crate chrono;
use crossbeam_channel::{unbounded, Receiver, Sender};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use warp::{Filter, Rejection, Reply};
#[derive(Debug, Clone, Deserialize, Serialize)]
struct ServerResponse {
message: String,
}
impl warp::Reply for ServerResponse {
fn into_response(self) -> warp::reply::Response {
warp::reply::json(&self).into_response()
}
}
#[derive(Debug)]
struct HandlerData {
image_id: String,
send_results_oneshot: oneshot::Sender<ServerResponse>,
}
fn main() {
env_logger::init();
let (tx_batch_data, rx_batch_data) = unbounded::<Vec<HandlerData>>();
let server_thread = std::thread::spawn(move || match Runtime::new() {
Ok(rt) => {
rt.block_on(server(tx_batch_data));
}
Err(err) => error!("Error initializing runtime for server : {}", err),
});
let inference_and_cleanup_thread = std::thread::spawn(move || match Runtime::new() {
Ok(rt) => {
rt.block_on(inference_and_cleanup(&rx_batch_data));
}
Err(err) => error!("Error initializing runtime for inference : {}", err),
});
let _ = server_thread.join();
let _ = inference_and_cleanup_thread.join();
}
async fn server(tx_handler_data: Sender<Vec<HandlerData>>) {
let endpoint = warp::path!("imageId" / String)
.and(warp::any().map(move || tx_handler_data.clone()))
.and(warp::any().map(move || oneshot::channel::<ServerResponse>()))
.and_then(response_handler);
warp::serve(endpoint).run(([0, 0, 0, 0], 3030)).await;
}
async fn response_handler(
image_id: String,
tx_handler_data: Sender<Vec<HandlerData>>,
(send_results_oneshot, get_results): (
oneshot::Sender<ServerResponse>,
oneshot::Receiver<ServerResponse>,
),
) -> Result<impl Reply, Rejection> {
// create a oneshot sender and reciever for getting back the results
// send to batch and preprocess task
tx_handler_data
.send(vec![HandlerData {
image_id: image_id.clone(),
send_results_oneshot: send_results_oneshot,
}])
.unwrap_or_else(|e| {
error!(
"Error while sending the data from response handler! : {}",
e
)
});
let result: ServerResponse = get_results.await.unwrap_or_else(|e| {
error!(
"Error getting results from oneshot in response handler: {:?}",
e
);
// dummy val
ServerResponse {
message: "from error handler".to_string(),
}
});
Ok(result)
}
async fn inference_and_cleanup(rx_batch_data: &Receiver<Vec<HandlerData>>) {
loop {
let batch_received: Option<Vec<HandlerData>> = rx_batch_data.try_recv().ok();
if let Some(batch) = batch_received {
info!("Got a batch of size {}", batch.len());
tokio::time::sleep(Duration::from_millis(500)).await;
for ele in batch {
// tokio::time::delay_for(Duration::from_millis(10)).await;
let oneshot = ele.send_results_oneshot;
if !oneshot.is_closed() {
oneshot
.send(ServerResponse {
message: "worked successfully".to_string(),
})
.unwrap_or_else(|e| {
error!("Error while sending back results via oneshot : {:?}", e);
});
} else {
error!("Didn't send anything, the oneshot reciever was closed");
}
}
}
}
}
我一直Didn't send anything, the oneshot reciever was closed
在加载日志。
这是怎么回事?这与它的架构方式或warp如何处理请求有关吗?我会很感激任何帮助。