我正在使用 Reactor 从 Kafka 主题中读取数据。每条消息的细化请求对 MongoDB 的查询,这比从 Kafka 主题读取消息要慢。因此,我对流应用了背压处理。
receiver.receive()
// Limiting the reading operation
.limitRate(50)
// processMessage accesses to the database
.flatMap(this::processMessage)
.publish()
// Simplification here
.subscribe();
我正在使用 aConnectableFlux
来为生产者提供多个订阅者KafkaReceiver
。KafkaReceiver
本机不允许超过一个订阅者。
我需要测试我的代码是否正确地将背压应用于流。我怎么能做到这一点,使用一些集成测试?
谢谢大家。