0

完成 Flux 后如何启动 Flux?Flux 不依赖于 Flux 的结果。

我认为操作 then() 应该可以完成工作,但它只是返回单声道。那么正确的运算符是什么?

4

2 回答 2

1

Flux.thenMany是您需要的运算符

    abstract Flux<Integer> flux1();
    abstract Flux<String> flux2();

    public Flux<String> flux2AfterFlux1(){
        return flux1().thenMany(flux2());
    }
于 2020-12-31T13:08:47.930 回答
0

我的回复迟了,所以你可能已经弄清楚了,或者继续前进。我需要做同样的事情,所以我似乎通过使用Flux.usingWhen(). 我需要从我的数据库中获取具有特定属性的 ID。检索到这些 ID 后,我需要获取有关这些 ID 的一些信息。通常,连接会起作用,但我使用的是 MongoDB,而且连接很糟糕,所以最好进行单独的顺序查询。因此,假设我有一个 Mongo 反应式存储库,其方法名为Flux<String> findInterestingIds(). 我在同一个存储库中有另一个方法Flux<String> findInterestingInformation(String[] interestingIds),我需要将调用中找到的 ID 提供给前一个方法。所以这就是我的处理方式:

Mono<List<String>> interestingIdsMono = myRepo.findInterestingIds()
    .collectList();
Flux.usingWhen(interestingIdsMono,
    interestingIds -> myRepo.findInterestingInformation(interestingIds),
    interestingIds -> Flux.empty().doOnComplete(() -> log.debug("Complete"));

清理函数(第三个参数)是我还不太了解如何以一种好的方式使用的东西,所以我只记录了完成,当通量完成时我不需要发出任何额外的东西。

ProjectReactor 曾经有一个 compose 函数,现在称为 transformDeferred,我对此抱有一些希望,但我不太明白它在这种情况下如何应用。无论如何,这是我的尝试,所以请随时向我展示更好的方法!

于 2020-12-30T17:27:10.883 回答