问题陈述:消耗来自 Kafka 的百万条记录并旋转并行 API 调用(120 TPS)
我正在使用项目反应器 kafka 来处理 Kafka 消息消耗(每小时 200 万条记录)。一旦我收到 kafka 消息,我需要将并行 API 调用(10 TPS)旋转到“abc.com/actuator”。我测试了 kafka 部分 .. 我能够在 20 分钟内消耗数百万条记录(使用 4 个 Kubernetes Pod)。但是当我旋转 API 调用时,一切都是按顺序进行的, 而不是并行的。此外,API 需要 1000 毫秒才能返回响应(这会增加等待时间)。有人可以帮助了解并行 API 调用有什么问题吗?提前致谢。
ReceiverOptions<Integer, String> options =
receiverOptions
.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
final Flux<ReceiverRecord<Integer, String>> messages = Flux.defer() -> {
final Flux<ReceiverRecord<Integer,String>> receiver =
kafkaReceicer.create(options).receive();
return Flux.<ReceiverRecord<Integer,String>>create(emmitter -> {
kafkaFlux.doOnNext(record-> {
ReceiverOffset offset = record.receiverOffset();
offset.acknowledge();
emitter.next(record);
}).blockLast();
});
});
WebClient wc = WebClient.create("abc.com:8443");
Flux.from(messages).flatMap(event -> wc.get().uri("/actuator").retrieve().bodyToMono(String.class)
.parallel(10).runOn(Schedulers.parallel()).subscribe();
Kubernetes 配置:
CPU:300m 内存:10Gi