3

我正在使用 Spring 5,详细介绍 Reactor 项目,从庞大的 Mongo 集合中读取信息到 Kafka 主题。不幸的是,Kafka 消息的生成比使用它们的程序快得多。所以,我需要实现一些背压机制。

假设我想要每秒 100 条消息的吞吐量。谷歌搜索了一下,我决定结合该buffer(int maxSize)方法的特性,将结果与使用预定义间隔发出消息的 a一起压缩。Flux

 // Create a clock that emits an event every second
 final Flux<Long> clock = Flux.interval(Duration.ofMillis(1000L));
 // Create a buffered producer
 final Flux<ProducerRecord<String, Data>> outbound =
            repository.findAll()
                      .map(this::buildData)
                      .map(this::createKafkaMessage)
                      .buffer(100)
                      // Limiting the emission in time interval
                      .zipWith(clock, (msgs, tick) -> msgs)
                      .flatMap(Flux::fromIterable);
 // Subscribe a Kafka sender
 kafkaSender.createOutbound()
            .send(outbound)
            .then()
            .block();

有没有更聪明的方法来做到这一点?我的意思是,在我看来,它有点复杂(总体而言,拉链部分)。

4

1 回答 1

2

是的,你可以直接使用delayElements(Duration.ofSeconds(1))操作,不需要 zipWith 。反应堆酷项目总是有增强的,因为它是一个持续的升级,所以让我们保持粘性:) 希望有帮助!

于 2018-03-19T14:39:56.950 回答