2

在文档中写道,您应该将阻塞代码包装到Monohttp ://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

但它没有写如何实际做到这一点。

我有以下代码:

@PostMapping(path = "some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doeSomething(@Valid @RequestBody Flux<Something> something) {
    something.subscribe(something -> {
        // some blocking operation
    });

    // how to return Mono<Void> here?
}

我在这里遇到的第一个问题是我需要返回一些东西,但我不能。例如,如果我会返回一个Mono.empty请求,那么该请求将在通量工作完成之前关闭。

第二个问题是:我如何真正包装阻塞代码,就像文档中建议的那样:

Mono blockingWrapper = Mono.fromCallable(() -> { 
    return /* make a remote synchronous call */ 
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic()); 
4

1 回答 1

6

您不应该subscribe在控制器处理程序中调用,而只是构建一个反应式管道并返回它。最终,HTTP 客户端将请求数据(通过 Spring WebFlux 引擎),这就是向管道订阅和请求数据的内容。

手动订阅会将请求处理与其他操作分离,这将 1) 消除对操作顺序的任何保证,以及 2) 如果该其他操作正在使用 HTTP 资源(例如请求正文),则中断处理。

在这种情况下,源不是阻塞的,而只是变换操作。所以我们最好用publishOn信号来表示链的其余部分应该在特定的调度器上执行。如果这里的操作是 I/O 密集型的,那么Schedulers.elastic()是最好的选择,如果它是 CPU 密集型的,那就Schedulers .paralell更好了。这是一个例子:

@PostMapping(path = "/some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doSomething(@Valid @RequestBody Flux<Something> something) {

    return something.collectList()
      .publishOn(Schedulers.elastic())
      .map(things -> { 
         return processThings(things);
      })
      .then();        
}

public ProcessingResult processThings(List<Something> things) {
  //...
}

有关该主题的更多信息,请查看反应器文档中的调度程序部分。如果您的应用程序倾向于做很多这样的事情,那么您将失去反应式流的很多好处,您可能会考虑切换到基于 Servlet 的模型,您可以在其中相应地配置线程池。

于 2018-08-29T08:32:31.990 回答