我有以下代码:
List<Mono<MyResponseDTO>> monoList = queue.stream()
.map(jobStatusBunch -> webClient
.post()
.uri("localhost:8080/api/some/url")
.bodyValue(convertToRequestDto(someBean))
.retrieve()
.toEntity(String.class)
.filter(HttpEntity::hasBody)
.map(stringResponseEntity -> {
try {
return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
} catch (JsonProcessingException e) {
log.error("Can't parse", e);
return null;
}
})
.doOnNext(myResponseDTO -> {
log.info("doOnNext is invoked");
})
).collect(Collectors.toList());
//await when all MONOs are completed
Mono<Void> mono = Flux.fromIterable(monoList).then();
mono.subscribe(aVoid -> {
log.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
});
webClient 实例化:
this.webClient = WebClient.builder().build();
我的目标是允许并行执行队列中的所有 http 请求,并在所有请求完成后等待。但是尽管队列不是空的,但实际的 http 调用并没有发生。你能解释一下为什么以及如何解决它吗?
更新
感谢@caco3 的建议。我更正了这样的代码:
log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
.flatMap(Function.identity())
.then();
log.info("Finished waiting for {}", monoList);
我看到以下日志:
2019-11-19 19:17:17.733 INFO 5896 --- [ scheduling-1] c.b.m.service.MyService : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988 INFO 5896 --- [ scheduling-1] c.b.m.service.MyService : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [ scheduling-1] o.s.w.r.f.client.ExchangeFunctions : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230 INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService : doOnNext is invoked
所以当前代码不会等待所有请求终止。但我真的需要它