0

我正在对 DB 进行多次单声道调用。并且需要所有 Mono 响应的结果来计算在声明的 Mono 逻辑之后编写的最终结果。

if (SomeObject.getAccountLevelActiveList() != null) {

                SomeObject.getAccountLevelActiveList().parallelStream().forEach(account -> {
                    Mono<SubLine> subLineMono= SubLineService
                            .getLineLevelCustProfile(preNbsLineLevelConverter.getSubLine(account ));
                

                    subLineMono.subscribe(subLine-> PollObject.getSubList()
                            .put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine)));

                });

            }

但是我的主要逻辑是在单声道结果​​存储到PollObject之前执行。所以我在PollObject中得到了 null 。所以我想停止我的主线程,直到 Mono 结果存储到PollObject中。

4

1 回答 1

1

如果你想停止主线程,那么你可以使用阻塞而不是订阅,但你必须先转换ListFlux然后flatMap-it 使用提供Mono。您在subscribe方法中拥有的逻辑可以移动到副作用运算符doOnNextMono或包装Flux

Flux.fromIterable(SomeObject.getAccountLevelActiveList())
    .flatMap(account ->
        SubLineService.getLineLevelCustProfile(
            preNbsLineLevelConverter.getSubLine( account ))
    ).doOnNext(subLine ->
        PollObject.getSubList().put(accountLevelMtn.getMtn(),
            Optional.ofNullable(subLine))
    ).blockLast();
    // the following code will be executed first when all monos are completed

如果您的代码if不需要在主线程中运行,那么最好保持反应性,就像@chrylis-cautiouslyoptimistic 已经建议的那样。使用reduce运算符将​​所有结果放在一起,生成当所有提供的单声道完成时完成的单声道:

Flux.fromIterable(SomeObject.getAccountLevelActiveList())
    .flatMap(account ->
        SubLineService.getLineLevelCustProfile(
            preNbsLineLevelConverter.getSubLine( account ))
    ).reduce(PollObject.getSubList(), (subList, subLine) ->
        sublist.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine))
    ).map(subList -> {
        // the code here will be executed first when all monos are completed
    })
    // ... other operators if necessary
    // eventually subscribing or returning the mono for further processing 
    .subscribe();
于 2020-09-28T09:13:25.403 回答