完成 Flux 后如何启动 Flux?Flux 不依赖于 Flux 的结果。
我认为操作 then() 应该可以完成工作,但它只是返回单声道。那么正确的运算符是什么?
完成 Flux 后如何启动 Flux?Flux 不依赖于 Flux 的结果。
我认为操作 then() 应该可以完成工作,但它只是返回单声道。那么正确的运算符是什么?
Flux.thenMany是您需要的运算符
abstract Flux<Integer> flux1();
abstract Flux<String> flux2();
public Flux<String> flux2AfterFlux1(){
return flux1().thenMany(flux2());
}
我的回复迟了,所以你可能已经弄清楚了,或者继续前进。我需要做同样的事情,所以我似乎通过使用Flux.usingWhen()
. 我需要从我的数据库中获取具有特定属性的 ID。检索到这些 ID 后,我需要获取有关这些 ID 的一些信息。通常,连接会起作用,但我使用的是 MongoDB,而且连接很糟糕,所以最好进行单独的顺序查询。因此,假设我有一个 Mongo 反应式存储库,其方法名为Flux<String> findInterestingIds()
. 我在同一个存储库中有另一个方法Flux<String> findInterestingInformation(String[] interestingIds)
,我需要将调用中找到的 ID 提供给前一个方法。所以这就是我的处理方式:
Mono<List<String>> interestingIdsMono = myRepo.findInterestingIds()
.collectList();
Flux.usingWhen(interestingIdsMono,
interestingIds -> myRepo.findInterestingInformation(interestingIds),
interestingIds -> Flux.empty().doOnComplete(() -> log.debug("Complete"));
清理函数(第三个参数)是我还不太了解如何以一种好的方式使用的东西,所以我只记录了完成,当通量完成时我不需要发出任何额外的东西。
ProjectReactor 曾经有一个 compose 函数,现在称为 transformDeferred,我对此抱有一些希望,但我不太明白它在这种情况下如何应用。无论如何,这是我的尝试,所以请随时向我展示更好的方法!