我正在尝试使用 Kafka 在用 Rust 和 Node.js 编写的 2 个微服务之间进行通信。
我使用 actix-web 作为 Web 框架,使用 rdkafka 作为 Rust 的 Kafka 客户端。在 Node.js 方面,它从数据库中查询内容并通过 Kafka 将其作为 JSON 返回到 Rust 服务器。
流量:
Request -> Actix Web -> Kafka -> Node -> Kafka -> Actix Web -> Response
逻辑是请求到达 Actix Web 上的端点,然后创建一条消息向另一个微服务请求某些内容,并等待它发回(通过 Kafka 消息密钥验证),并将其作为 HTTP 响应返回给用户。
我让它工作了,但性能很慢(我正在用 进行压力测试wrk
)。
我不确定为什么它的执行速度很慢,但是当我深入挖掘时,我发现如果我在 Node.js 端添加延迟 5 秒,然后我向 actix-web 创建 2 个请求,其中请求相差一秒,它将以 5 秒和 10 秒的延迟响应。
基准是每秒大约 3k 个请求,使用以下命令:
wrk http://localhost:8080 -d 20s -t 2 -c 200
这让我猜想某些东西可能会阻塞每个请求的线程。
use std::{
sync::Arc,
time::{
Duration,
Instant
}
};
use actix_web::{
App,
HttpServer,
get,
rt,
web::Data
};
use futures::TryStreamExt;
use tokio::time::sleep;
use num_cpus;
use rand::{
distributions::Alphanumeric,
Rng
};
use rdkafka::{
ClientConfig,
Message,
consumer::{
Consumer,
StreamConsumer
},
producer::{
FutureProducer,
FutureRecord
}
};
const TOPIC: &'static str = "exp-queue_general-5";
#[derive(Clone)]
pub struct AppState {
pub producer: Arc<FutureProducer>,
pub receiver: flume::Receiver<String>
}
fn generate_key() -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect()
}
#[get("/")]
async fn landing(state: Data<AppState>) -> String {
let key = generate_key();
let t1 = Instant::now();
let producer = &state.producer;
let receiver = &state.receiver;
producer
.send(
FutureRecord::to(&format!("{}-forth", TOPIC))
.key(&key)
.payload("Hello From Rust"),
Duration::from_secs(8)
)
.await
.expect("Unable to send message");
println!("Producer take {} ms", t1.elapsed().as_millis());
let t2 = Instant::now();
let value = receiver
.recv()
.unwrap_or("".to_owned());
println!("Receiver take {} ms", t2.elapsed().as_millis());
println!("Process take {} ms\n", t1.elapsed().as_millis());
value
}
#[get("/status")]
async fn heartbeat() -> &'static str {
// ? Concurrency delay check
sleep(Duration::from_secs(1)).await;
"Working"
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// ? Assume that the whole node is just Rust instance
let mut cpus = num_cpus::get() / 2 - 1;
if cpus < 1 {
cpus = 1;
}
println!("Cpus {}", cpus);
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("linger.ms", "25")
.set("queue.buffering.max.messages", "1000000")
.set("queue.buffering.max.ms", "25")
.set("compression.type", "lz4")
.set("retries", "40000")
.set("retries", "0")
.set("message.timeout.ms", "8000")
.create()
.expect("Kafka config");
let (tx, rx) = flume::unbounded::<String>();
rt::spawn(async move {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", &format!("{}-back", TOPIC))
.set("queued.min.messages", "200000")
.set("fetch.error.backoff.ms", "250")
.set("socket.blocking.max.ms", "500")
.create()
.expect("Kafka config");
consumer
.subscribe(&vec![format!("{}-back", TOPIC).as_ref()])
.expect("Can't subscribe");
consumer
.stream()
.try_for_each_concurrent(
cpus,
|message| {
let txx = tx.clone();
async move {
let result = String::from_utf8_lossy(
message
.payload()
.unwrap_or("Error serializing".as_bytes())
).to_string();
txx.send(result).expect("Tx not sending");
Ok(())
}
}
)
.await
.expect("Error reading stream");
});
let state = AppState {
producer: Arc::new(producer),
receiver: rx
};
HttpServer::new(move || {
App::new()
.app_data(Data::new(state.clone()))
.service(landing)
.service(heartbeat)
})
.workers(cpus)
.bind("0.0.0.0:8080")?
.run()
.await
}
我在 GitHub 上发现了一些已解决的问题,建议使用演员代替,我也将其作为单独的分支。
这比主分支的性能更差,每秒执行大约 200-300 个请求。
我不知道瓶颈在哪里,也不知道是什么阻碍了请求。