我想定义一个使用 Reactor Kafka 消耗 kafka 并写入 MongoDB 的流,并且只有在成功时才会将 ID 写入 Kafka。我正在将 Project Reactor 与 Spring Integration JavaDSL 一起使用,并且我希望有一个FlowBuilder
类可以在较高级别上定义我的管道。我目前有以下方向:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(MongoDb.reactiveOutboundChannelAdapter()))
.handle(writeToKafka)
.get();
}
我在文档中看到支持另一种方法,该方法也适用于 Project Reactor。这种方法不包括使用IntegrationFlows
. 这看起来像这样:
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Mono<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(integers -> ...);
我想知道在使用这两个库时更推荐的处理方式是什么。我想知道如何在第二个示例中使用 Reactive MongoDB 适配器。我不确定如果没有IntegrationFlows
包装器,第二种方法是否可行。