我有一个flux
由Iterable
8 个元素组成的 ( Flux.fromIterable(..)
)。对于每个通量发射,我想异步调用一个方法。我尝试了各种方法dispatchOn
,publishOn
但都没有奏效,最终我解决了map(CompletableFuture.supplyAsync(..), executor)
将转换flux
为flux<CompletableFuture<Boolean>>
.
现在我只想在最后一项完成时继续流程。我尝试了all(..)
, 和 withtake(size of the Iterable)
但在这两种情况下,在所有元素完成之前流程都会继续。我认为这是因为我的执行程序只有 4 个线程,并且需要一些时间才能将CompletableFuture
-s 添加到通量中。
为什么不all(..)
或take(8)
等待助焊剂完成?我怎样才能让它等待?
编码:
Mono
.fromFuture(dbUtil.getEntity(id))
.doOnError(t -> {
...
return;})
.doOnSuccess(s -> log.info("Got it: " + s))
.flatMap( s ->
Flux.fromIterable(s.getItemsMap().entrySet())
.map( e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR))
.take(s.getItemsMap().entrySet().size())
)
.all(...)
.consume(b -> done(b));