2

我想为每个队列元素执行 http 请求。这些请求应该被并行调用。
我还需要等待所有请求的终止。

我开发了以下代码:

 List<Mono<MyResponseDTO>> monoList = queue.stream()
                .map(jobStatusBunch -> webClient
                        .post()
                        .uri("localhost:8080/api/some/url")
                        .bodyValue(convertToRequestDto(someBean))
                        .retrieve()
                        .toEntity(String.class)
                        .filter(HttpEntity::hasBody)
                        .map(stringResponseEntity -> {
                            try {
                                return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
                            } catch (JsonProcessingException e) {
                                log.error("Can't parse", e);
                                return null;
                            }
                        })
                        .doOnNext(myResponseDTO -> {
                            log.info("doOnNext is invoked");
                        })
                ).collect(Collectors.toList());
          //await when all MONOs are completed

log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
        .flatMap(Function.identity())
        .then();
log.info("Finished waiting for {}", monoList);

当队列具有单个元素时,我会看到以下日志:

2019-11-19 19:17:17.733  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [   scheduling-1] o.s.w.r.f.client.ExchangeFunctions       : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230  INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService     : doOnNext is invoked

所以这段代码不允许等待请求终止。

我怎么能做到呢?

附言

看起来Flux.merge(monoList).blockLast()是我需要的东西。它会正常工作吗?

4

2 回答 2

0

简单案例

使用它来并行执行请求并等待它们完成:

List<Mono<MyResponseDTO>> monoList = queue
        .stream()
        .map(requestDTO ->
                webClient
                    .post()
                    .uri("localhost:8080/api/some/url")
                    .bodyValue(requestDTO)
                    .retrieve()
                    .bodyToMono(MyResponseDTO.class))
        .collect(Collectors.toList());

// This will execute all requests in parallel and wait until they complete,
// or throw an exception if any request fails.
List<MyResponseDTO> responses = Flux.merge(monoList).collectList().block();

确认

您可能希望将日志记录设置reactor.netty.http.clientDEBUG以查看没有发出额外请求。例如,如果您不小心同时使用了mono#subscribe和 ,就会发生这种情况mono#block

CompletableFuture 更复杂的情况

如果您想将响应的处理和等待请求完成分开,那么可以使用 CompletableFutures:

List<Mono<MyResponseDTO>> webClientMonos = getMonos();

// Start executing requests in parallel.
List<CompletableFuture<MyResponseDTO>> futures = webClientMonos.stream()
        .map(mono -> mono.toFuture())
        .collect(toList());
for (CompletableFuture<MyResponseDTO> future : futures) {
    future.thenAccept(responseDTO -> {
        // Do something with a response when it arrives at some point.
        // ...
    });
}

// ...

// Block until all requests have completed.
for (CompletableFuture<MyResponseDTO> future : futures) {
    try {
        // Maybe WebClient has been configured with timeouts,
        // but it doesn't hurt to have a timeout here, too.
        future.get(60, TimeUnit.SECONDS);
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(ex);
    } catch (ExecutionException | TimeoutException ex) {
        // ExecutionException is thrown if HTTP request fails.
        throw new RuntimeException(ex);
    }
}
于 2021-03-26T16:48:56.477 回答
0

您可以尝试以下方法:

Flux<MyResponseDTO> responses = queue.stream()
     .flatMap(jobStatusBunch -> webClient
                    .post()
                    .uri("localhost:8080/api/some/url")
                    .bodyValue(convertToRequestDto(someBean))
                    .retrieve()
                    .toEntity(MyResponseDTO.class));

 Mono<Void> workDone = response.then();

这很简单,应该可以完成这项工作。默认情况下(如果我没记错的话),订阅者将请求256元素,这意味着您将获得最多 256 个并行处理的 HTTP 请求。这可以取决于 HTTP 客户端上配置的连接池;默认情况下,在 Reactor Netty 上,最大 TCP 通道数高于此值。

各种 Reactor 运算符,包括flatMap,提供带有concurrency方法参数的变体来控制那里的最大并发性。

您的Flux.merge带有列表的解决方案Mono将是等效的。另一方面, usingFlux.concat不是您想要的,因为它会在Mono请求元素时订阅,因此您可能无法获得所需的最大并发性。

于 2019-12-04T21:55:00.667 回答