0

我有一个fluxIterable8 个元素组成的 ( Flux.fromIterable(..))。对于每个通量发射,我想异步调用一个方法。我尝试了各种方法dispatchOnpublishOn但都没有奏效,最终我解决了map(CompletableFuture.supplyAsync(..), executor)将转换fluxflux<CompletableFuture<Boolean>>.

现在我只想在最后一项完成时继续流程。我尝试了all(..), 和 withtake(size of the Iterable)但在这两种情况下,在所有元素完成之前流程都会继续。我认为这是因为我的执行程序只有 4 个线程,并且需要一些时间才能将CompletableFuture-s 添加到通量中。

为什么不all(..)take(8)等待助焊剂完成?我怎样才能让它等待?

编码:

    Mono
    .fromFuture(dbUtil.getEntity(id))
    .doOnError(t -> {
        ...
        return;})
    .doOnSuccess(s -> log.info("Got it: " + s))
    .flatMap( s -> 
        Flux.fromIterable(s.getItemsMap().entrySet())
            .map( e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR))
            .take(s.getItemsMap().entrySet().size()) 
    )
    .all(...)
    .consume(b -> done(b));
4

2 回答 2

1

如果你想并行计算每个项目,你可以使用 flatMap+just+subscribeOn+map。我对 Reactor 的调度器类型不太熟悉,所以我会给你一个 RxJava 的例子:

ExecutorService exec = ...
Scheduler scheduler = Schedulers.from(exec);

Observable.from(...)
.flatMap(e -> Observable.just(e).subscribeOn(scheduler).map(v -> process(v)))
.subscribe(...);
于 2016-03-23T08:16:11.173 回答
0

我会尝试添加你的 observable

 .finallyDo("here arrive when onComplete or onError")

因为我可以假设您不能为每个迭代器创建一个 observable,然后合并或压缩所有迭代器并订阅它。

   Observable.merge(obsevableIter1(), observableIter2())
              .subscribe(result -> showResult(result.toString()));
于 2016-03-23T09:02:38.870 回答