下面对通量发射的元素进行并行计算的实现有什么区别。
Flux.fromIterable(list)
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(items -> Mono.fromCallable(() -> blocking database call).subscribeOn(Schedulers.boundedElastic()))
.map(dbResponse -> dbResponse.stream().map(singleObj-> createAdifferentObject(dbResponse)).collect(Collectors.toList())
.block()
和
Flux.fromIterable(list)
.flatMap(items -> Mono.fromCallable(() -> blocking database call).subscribeOn(Schedulers.boundedElastic()))
.map(dbResponse -> dbResponse.stream().map(singleObj-> createAdifferentObject(dbResponse)).collect(Collectors.toList())
.block()
对于第一段代码,我提到了这个块
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
来自这个博客https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers
对于第二段代码,一般在我们的团队或组织中,每个人都使用它。该文档说,调度程序。(没有新词)从此链接https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html创建一个共享实例,但就流程而言,那不应该在flatMap之外吗?如果不是,它是如何工作的?