1

我如何限制并行执行的任务线程?问题很简单——当我的调度程序工作时,我不能做任何其他事情(使用邮递员等获取一些信息)。有没有办法解决这个问题?此外,我尝试设置不断变化的线程数,例如,使用 parallel(3).runOn(Schedulers.parallel()) 并且我的程序仍然被阻止。

@Scheduled(fixedRate = 60000L)
@PostConstruct
public void fillMap() {
    Flux.fromIterable(proxyParserService.getProxyList())
            .parallel()
            .runOn(Schedulers.parallel())
            .flatMap(geoDataService::getData)
//some logic here...

还值得一提的是,我有 flatmap 方法可以并行打开连接:

    public Mono<Address> getData(Address proxy) {
    WebClient webClient = WebClient.builder()
            .baseUrl(String.format(URL, proxy.getHost()))
            .build();
    WebClient.RequestBodyUriSpec request = webClient.method(HttpMethod.GET);
    return request.retrieve()
            .onStatus(HttpStatus::isError, clientResponse -> {
                log.error("Error while calling endpoint {} with status code {}",
                        URL, clientResponse.statusCode());
                throw new RuntimeException("Error while calling geolocation endpoint");
            })
            .bodyToMono(Address.class)
4

0 回答 0