我正在使用 Spring 5,详细介绍 Reactor 项目,从庞大的 Mongo 集合中读取信息到 Kafka 主题。不幸的是,Kafka 消息的生成比使用它们的程序快得多。所以,我需要实现一些背压机制。
假设我想要每秒 100 条消息的吞吐量。谷歌搜索了一下,我决定结合该buffer(int maxSize)
方法的特性,将结果与使用预定义间隔发出消息的 a一起压缩。Flux
// Create a clock that emits an event every second
final Flux<Long> clock = Flux.interval(Duration.ofMillis(1000L));
// Create a buffered producer
final Flux<ProducerRecord<String, Data>> outbound =
repository.findAll()
.map(this::buildData)
.map(this::createKafkaMessage)
.buffer(100)
// Limiting the emission in time interval
.zipWith(clock, (msgs, tick) -> msgs)
.flatMap(Flux::fromIterable);
// Subscribe a Kafka sender
kafkaSender.createOutbound()
.send(outbound)
.then()
.block();
有没有更聪明的方法来做到这一点?我的意思是,在我看来,它有点复杂(总体而言,拉链部分)。