0

下面对通量发射的元素进行并行计算的实现有什么区别。

Flux.fromIterable(list)
                .parallel()
                .runOn(Schedulers.boundedElastic())
                .flatMap(items -> Mono.fromCallable(() -> blocking database call).subscribeOn(Schedulers.boundedElastic()))
                .map(dbResponse -> dbResponse.stream().map(singleObj-> createAdifferentObject(dbResponse)).collect(Collectors.toList())
                .block()

Flux.fromIterable(list)
                .flatMap(items -> Mono.fromCallable(() -> blocking database call).subscribeOn(Schedulers.boundedElastic()))
                .map(dbResponse -> dbResponse.stream().map(singleObj-> createAdifferentObject(dbResponse)).collect(Collectors.toList())
                .block()

对于第一段代码,我提到了这个块

  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}

来自这个博客https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers

对于第二段代码,一般在我们的团队或组织中,每个人都使用它。该文档说,调度程序。(没有新词)从此链接https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html创建一个共享实例,但就流程而言,那不应该在flatMap之外吗?如果不是,它是如何工作的?

4

0 回答 0