1

我正在尝试构建一种方法,该方法应该对一个外部端点执行许多 HTTP 请求,而一个查询参数小于 45000。

我需要这样做,因为外部端点允许我获取 100 个项目,但要获取的项目超过 44000 个。

private int offset = 0;

public Flux<List<Model>> getItems() {
    return Flux.from(
            webClientBuilder
                    .build()
                    .get()
                    .uri(uriBuilder -> uriBuilder
                            .path("/getItems")
                            .queryParam("limit", 100)
                            .queryParam("offset", getOffset())
                            .build())
                    .retrieve()
                    .bodyToMono(Model.class)
                    .doOnSuccess(System.out::println)
                    .flatMap(model -> {
                        setOffset(getOffset() + 100);
                        log.info("Offset: " + getOffset());
                        return repository.saveAll(model.getData().getResults()).collectList();
                    }).delayElement(Duration.ofSeconds(15)))
                    .repeat(() -> getOffset() <= 45000);
}

public int getOffset() {
    return offset;
}

public void setOffset(int offset) {
    this.offset = offset;
}

它似乎有效,因为记录偏移参数增加但 HTTP 请求的偏移量等于 0。该方法返回前 100 个项目而不是 44566 个项目

4

1 回答 1

3

问题实际上是,它webclient是在订阅之前急切构建的,并且“缓存”了初始offset值。每次调用后,Flux都会重新订阅,但准备好的带有偏移量的 Web 服务调用仍然“缓存”。您必须weblient以惰性方式提供(例如通过将其包装在 lambda 中),这会强制其所有参数为每次调用重新计算。有一个特殊的运算符 - defer()

解决方案

Mono<Model> response = Mono.defer(() -> webClientBuilder
        .build()
        .get()
        .uri(uriBuilder -> uriBuilder
                .path("/getItems")
                .queryParam("limit", 100)
                .queryParam("offset", getOffset())
                .build())
        .retrieve()
        .bodyToMono(Model.class)
);


Flux.from(response
        .doOnEach(System.out::println)
        .flatMap(model -> {
            setOffset(getOffset() + 100);
            log.info("Offset: " + getOffset());
            return repository.saveAll(model.getData().getResults()).collectList();
        }).delayElement(Duration.ofSeconds(15))
).repeat(() -> getOffset() <= 45000).subscribe();

另一个与急切执行演示相同问题的问题: 总是调用 Mono switchIfEmpty()

于 2019-07-21T10:06:46.673 回答