0

我正在尝试使用 Kafka 在用 Rust 和 Node.js 编写的 2 个微服务之间进行通信。

我使用 actix-web 作为 Web 框架,使用 rdkafka 作为 Rust 的 Kafka 客户端。在 Node.js 方面,它从数据库中查询内容并通过 Kafka 将其作为 JSON 返回到 Rust 服务器。

流量:

Request -> Actix Web -> Kafka -> Node -> Kafka -> Actix Web -> Response

逻辑是请求到达 Actix Web 上的端点,然后创建一条消息向另一个微服务请求某些内容,并等待它发回(通过 Kafka 消息密钥验证),并将其作为 HTTP 响应返回给用户。


我让它工作了,但性能很慢(我正在用 进行压力测试wrk)。

我不确定为什么它的执行速度很慢,但是当我深入挖掘时,我发现如果我在 Node.js 端添加延迟 5 秒,然后我向 actix-web 创建 2 个请求,其中请求相差一秒,它将以 5 秒和 10 秒的延迟响应。

基准是每秒大约 3k 个请求,使用以下命令:

wrk http://localhost:8080 -d 20s -t 2 -c 200

这让我猜想某些东西可能会阻塞每个请求的线程。

这是源代码回购

use std::{
    sync::Arc,
    time::{ 
        Duration, 
        Instant
    }
};

use actix_web::{
    App, 
    HttpServer, 
    get, 
    rt,
    web::Data
};

use futures::TryStreamExt;
use tokio::time::sleep;

use num_cpus;
use rand::{
    distributions::Alphanumeric, 
    Rng
};

use rdkafka::{
    ClientConfig, 
    Message, 
    consumer::{
        Consumer, 
        StreamConsumer
    }, 
    producer::{
        FutureProducer, 
        FutureRecord
    }
};

const TOPIC: &'static str = "exp-queue_general-5";

#[derive(Clone)]
pub struct AppState {
    pub producer: Arc<FutureProducer>,
    pub receiver: flume::Receiver<String>
}

fn generate_key() -> String {
    rand::thread_rng()
        .sample_iter(&Alphanumeric)
        .take(8)
        .map(char::from)
        .collect()
}

#[get("/")]
async fn landing(state: Data<AppState>) -> String {
    let key = generate_key();
    let t1 = Instant::now();

    let producer = &state.producer;
    let receiver = &state.receiver;

    producer
        .send(
            FutureRecord::to(&format!("{}-forth", TOPIC))
                .key(&key)
                .payload("Hello From Rust"),
                Duration::from_secs(8)
        )
        .await
        .expect("Unable to send message");

    println!("Producer take {} ms", t1.elapsed().as_millis());
    
    let t2 = Instant::now();
    let value = receiver
        .recv()
        .unwrap_or("".to_owned());

    println!("Receiver take {} ms", t2.elapsed().as_millis());
    println!("Process take {} ms\n", t1.elapsed().as_millis());

    value
}

#[get("/status")]
async fn heartbeat() -> &'static str {
    // ? Concurrency delay check
    sleep(Duration::from_secs(1)).await;

    "Working"
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // ? Assume that the whole node is just Rust instance
    let mut cpus = num_cpus::get() / 2 - 1;

    if cpus < 1 {
        cpus = 1;
    }

    println!("Cpus {}", cpus);
    
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("linger.ms", "25")
        .set("queue.buffering.max.messages", "1000000")
        .set("queue.buffering.max.ms", "25")
        .set("compression.type", "lz4")
        .set("retries", "40000")
        .set("retries", "0")
        .set("message.timeout.ms", "8000")
        .create()
        .expect("Kafka config");

    let (tx, rx) = flume::unbounded::<String>();

    rt::spawn(async move {
        let consumer: StreamConsumer = ClientConfig::new()
            .set("bootstrap.servers", "localhost:9092")
            .set("group.id", &format!("{}-back", TOPIC))
            .set("queued.min.messages", "200000")
            .set("fetch.error.backoff.ms", "250")
            .set("socket.blocking.max.ms", "500")
            .create()
            .expect("Kafka config");

        consumer
            .subscribe(&vec![format!("{}-back", TOPIC).as_ref()])
            .expect("Can't subscribe");

        consumer
            .stream()
            .try_for_each_concurrent(
                cpus,
                |message| {
                    let txx = tx.clone();

                    async move {
                        let result = String::from_utf8_lossy(
                            message
                            .payload()
                            .unwrap_or("Error serializing".as_bytes())
                        ).to_string();

                        txx.send(result).expect("Tx not sending");

                        Ok(())
                    }

                }
            )
            .await
            .expect("Error reading stream");
    });

    let state = AppState {
        producer: Arc::new(producer),
        receiver: rx
    };

    HttpServer::new(move || {
        App::new()
            .app_data(Data::new(state.clone()))
            .service(landing)
            .service(heartbeat)
    })
    .workers(cpus)
    .bind("0.0.0.0:8080")?
    .run()
    .await
}

我在 GitHub 上发现了一些已解决的问题,建议使用演员代替,我也将其作为单独的分支

这比主分支的性能更差,每秒执行大约 200-300 个请求。


我不知道瓶颈在哪里,也不知道是什么阻碍了请求。

4

0 回答 0