1

问题陈述:消耗来自 Kafka 的百万条记录并旋转并行 API 调用(120 TPS)

我正在使用项目反应器 kafka 来处理 Kafka 消息消耗(每小时 200 万条记录)。一旦我收到 kafka 消息,我需要将并行 API 调用(10 TPS)旋转到“abc.com/actuator”。我测试了 kafka 部分 .. 我能够在 20 分钟内消耗数百万条记录(使用 4 个 Kubernetes Pod)。但是当我旋转 API 调用时,一切都是按顺序进行的, 而不是并行的。此外,API 需要 1000 毫秒才能返回响应(这会增加等待时间)。有人可以帮助了解并行 API 调用有什么问题吗?提前致谢。

ReceiverOptions<Integer, String> options =
        receiverOptions
            .subscription(Collections.singleton(topic))
            .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
            .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
    final Flux<ReceiverRecord<Integer, String>> messages = Flux.defer() -> {
    final Flux<ReceiverRecord<Integer,String>> receiver = 
    kafkaReceicer.create(options).receive();
    return Flux.<ReceiverRecord<Integer,String>>create(emmitter -> {
           kafkaFlux.doOnNext(record-> {
            ReceiverOffset offset = record.receiverOffset();
            offset.acknowledge();
            emitter.next(record);
       }).blockLast();
  });
});

WebClient wc = WebClient.create("abc.com:8443");
Flux.from(messages).flatMap(event -> wc.get().uri("/actuator").retrieve().bodyToMono(String.class)
.parallel(10).runOn(Schedulers.parallel()).subscribe();

Kubernetes 配置

CPU:300m 内存:10Gi

4

0 回答 0