0

我正在使用 warp 和 tokio 使我的网络服务器生锈。这就是我正在做的事情:

  • 我正在创建三个 tokio 运行时,并在它们上执行 3 个异步函数,所有这些函数都使用通道相互通信。我这样做是因为我正在制作一个用于模型推理的网络服务器,每个部分负责一件事(预处理和批处理,从 tf 模型推断,以及服务器本身)。
  • 每个请求的响应处理程序通过 mpsc 发送器将它接收到的数据传递给在另一个运行时运行的函数,该发送器的一个克隆被传递给所有处理程序。它还为另一个运行时传递一个单一的 Sender 以将结果发送回响应处理程序。

这适用于中等负载,但在重负载(50 个线程,100 个循环)下,响应处理程序中的 oneshot 接收器似乎被丢弃,服务器无法返回结果。

我在下面附上了一个最小的可重现示例:

#![allow(non_snake_case)]

#[macro_use]
extern crate log;
extern crate chrono;

use crossbeam_channel::{unbounded, Receiver, Sender};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use warp::{Filter, Rejection, Reply};

#[derive(Debug, Clone, Deserialize, Serialize)]
struct ServerResponse {
    message: String,
}
impl warp::Reply for ServerResponse {
    fn into_response(self) -> warp::reply::Response {
        warp::reply::json(&self).into_response()
    }
}

#[derive(Debug)]
struct HandlerData {
    image_id: String,
    send_results_oneshot: oneshot::Sender<ServerResponse>,
}

fn main() {
    env_logger::init();
    let (tx_batch_data, rx_batch_data) = unbounded::<Vec<HandlerData>>();

    let server_thread = std::thread::spawn(move || match Runtime::new() {
        Ok(rt) => {
            rt.block_on(server(tx_batch_data));
        }
        Err(err) => error!("Error initializing runtime for server : {}", err),
    });
    let inference_and_cleanup_thread = std::thread::spawn(move || match Runtime::new() {
        Ok(rt) => {
            rt.block_on(inference_and_cleanup(&rx_batch_data));
        }
        Err(err) => error!("Error initializing runtime for inference : {}", err),
    });
    let _ = server_thread.join();
    let _ = inference_and_cleanup_thread.join();
}

async fn server(tx_handler_data: Sender<Vec<HandlerData>>) {
    let endpoint = warp::path!("imageId" / String)
        .and(warp::any().map(move || tx_handler_data.clone()))
        .and(warp::any().map(move || oneshot::channel::<ServerResponse>()))
        .and_then(response_handler);
    warp::serve(endpoint).run(([0, 0, 0, 0], 3030)).await;
}

async fn response_handler(
    image_id: String,
    tx_handler_data: Sender<Vec<HandlerData>>,
    (send_results_oneshot, get_results): (
        oneshot::Sender<ServerResponse>,
        oneshot::Receiver<ServerResponse>,
    ),
) -> Result<impl Reply, Rejection> {
    // create a oneshot sender and reciever for getting back the results
    // send to batch and preprocess task
    tx_handler_data
        .send(vec![HandlerData {
            image_id: image_id.clone(),
            send_results_oneshot: send_results_oneshot,
        }])
        .unwrap_or_else(|e| {
            error!(
                "Error while sending the data from response handler! : {}",
                e
            )
        });
    let result: ServerResponse = get_results.await.unwrap_or_else(|e| {
        error!(
            "Error getting results from oneshot in response handler: {:?}",
            e
        );
        // dummy val
        ServerResponse {
            message: "from error handler".to_string(),
        }
    });
    Ok(result)
}

async fn inference_and_cleanup(rx_batch_data: &Receiver<Vec<HandlerData>>) {
    loop {
        let batch_received: Option<Vec<HandlerData>> = rx_batch_data.try_recv().ok();
        if let Some(batch) = batch_received {
            info!("Got a batch of size {}", batch.len());
            tokio::time::sleep(Duration::from_millis(500)).await;
            for ele in batch {
                // tokio::time::delay_for(Duration::from_millis(10)).await;
                let oneshot = ele.send_results_oneshot;
                if !oneshot.is_closed() {
                    oneshot
                        .send(ServerResponse {
                            message: "worked successfully".to_string(),
                        })
                        .unwrap_or_else(|e| {
                            error!("Error while sending back results via oneshot : {:?}", e);
                        });
                } else {
                    error!("Didn't send anything, the oneshot reciever was closed");
                }
            }
        }
    }
}

我一直Didn't send anything, the oneshot reciever was closed在加载日志。

这是怎么回事?这与它的架构方式或warp如何处理请求有关吗?我会很感激任何帮助。

4

0 回答 0