3

我想进行一个异步休息调用,我正在使用spring webclient并取回一个Mono。我也在并行进行一些数据库调用,但由于某种原因不能被动地完成。

    Map<String, Object> models = new HashMap<>();

    Mono<User> users = this.webClient...;
    users.map(resp -> new UserState(userRequest, resp))
            .subscribe(response -> {
                models.put("userState", response);
            });
    Iterable<Product> messages = this.productRepository.findAll();
    models.put("products", messages);
    //Wait for users.subscribe to finish <<<<<<<<<<<<<HERE
    return new ModelAndView("messages/list", models);

如何在返回 ModelAndView 之前等待订阅完成。Future如果我使用一个我可以get()随时随地做的地方,这会很容易。

4

1 回答 1

1

您可以将阻塞调用包装在Mono单独的调度程序上执行的 a 中,将其与Mono包含的UserState数据一起压缩并将它们的组合转换为 a Mono<ModelAndView>(可以从 Spring 控制器方法返回)。调用将并行执行,两个调用完成后结果将合并。

您可以为每个应用程序定义一个有界调度程序,专门用于阻塞调用,并将其作为构造函数参数提供给任何进行阻塞调用的类。

代码如下所示:

@Configuration 
class SchedulersConfig {

  @Bean
  Scheduler parallelScheduler(@Value("${blocking-thread-pool-size}") int threadsCount) {
    return Schedulers.parallel(threadsCount);
  }
}

@RestController
class Controller {

  final Scheduler parallelScheduler;

  ...

  Mono<User> userResponse = // webClient...

  Mono<Iterable<Product>> productsResponse = Mono.fromSupplier(productRepository::findAll)
    .subscribeOn(parallelScheduler); 

  return Mono.zip(userResponse, productsResponse, (user, products) -> 
    new ModelAndView("messages/list", 
      ImmutableMap.of(
        "userState", new UserState(userRequest, user),
        "products", products
      ))
  );
}

根据评论更新:
如果您只需要异步执行 HTTP 调用,然后将其与数据库结果连接,您可以执行以下操作

Map<String, Object> models = new HashMap<>();
Mono<User> userMono = webClient...;
CompletableFuture<User> userFuture = userMono.toFuture();
Iterable<Product> messages = productRepository.findAll();
User user = userFuture.join();
models.put("products", messages);
models.put("userState", new UserState(userRequest, user));
return new ModelAndView("messages/list", models);
于 2019-06-04T14:00:16.747 回答