我如何限制并行执行的任务线程?问题很简单——当我的调度程序工作时,我不能做任何其他事情(使用邮递员等获取一些信息)。有没有办法解决这个问题?此外,我尝试设置不断变化的线程数,例如,使用 parallel(3).runOn(Schedulers.parallel()) 并且我的程序仍然被阻止。
@Scheduled(fixedRate = 60000L)
@PostConstruct
public void fillMap() {
Flux.fromIterable(proxyParserService.getProxyList())
.parallel()
.runOn(Schedulers.parallel())
.flatMap(geoDataService::getData)
//some logic here...
还值得一提的是,我有 flatmap 方法可以并行打开连接:
public Mono<Address> getData(Address proxy) {
WebClient webClient = WebClient.builder()
.baseUrl(String.format(URL, proxy.getHost()))
.build();
WebClient.RequestBodyUriSpec request = webClient.method(HttpMethod.GET);
return request.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> {
log.error("Error while calling endpoint {} with status code {}",
URL, clientResponse.statusCode());
throw new RuntimeException("Error while calling geolocation endpoint");
})
.bodyToMono(Address.class)