2

我是 Spring Reactor 和 WebFlux 的新手,对 Spring 功能 web 中的事件流有点困惑。示例:我有一个处理函数返回一个Mono<ServerResponse>. 在其中,一个findAll()存储库方法被执行,返回一个Flux<T>. 为了遵守响应式宣言,为了实现异步、非阻塞并允许背压,我希望看到onNext()从存储库返回的每个元素都有一个。但是,在请求处理期间查看服务器日志,我只看到一个onNext()事件,这是有道理的,因为我的返回类型是Mono包含响应的:

路由器功能

@Bean
 public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
     return RouterFunctions
             .route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
                     , itemsHandler::getAll);
}

处理函数

Mono<ServerResponse> getAll(ServerRequest request) {
    return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(itemRepository.findAll(), Item.class)
            .log("GET items");
}

事件日志

2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | request(unbounded)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onComplete()

相比之下,实现一个带有Flux<T>as 返回类型的经典 Spring 注释控制器方法,我将看到一个onNext()for 每个实例T(即结果集的每个项目),这对我来说看起来更“正确”(客户端现在可以控制事件流量等):

控制器

@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
    return itemRepository
            .findAll()
            .log("GET items");
}

日志

2020-05-10 15:14:04.135  INFO 19096 --- [ctor-http-nio-5] GET items                                : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136  INFO 19096 --- [ctor-http-nio-5] GET items                                : request(unbounded)
2020-05-10 15:14:04.137  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onComplete()

这令人困惑。让我详细说明:

  • 从某种意义上说,使用Mono<ServerResponse>似乎是邪恶的,因为它将整个结果集封装在一个事件中,对我来说,这感觉就像打破了异步、非阻塞、启用背压的事件流的反应性原则。这不会从客户手中夺走控制权吗?对我来说,这看起来像是传统的、阻塞的客户端/服务器通信。
  • 直接返回Flux<T>感觉好多了,因为它支持按结果事件处理和背压控制。

我的问题是:

  • 创建一个 有什么影响Mono<ServerResponse>?这是否会导致阻塞、同步交互,onNext()仅在从 repo 中读取所有项目时才发出?我会失去背压功能等吗?
  • 如何让功能样式后端为onNext()结果集中的每个项目发送一个?
  • 就完全响应式(即非阻塞、异步和背压兼容)的函数式处理函数的返回类型而言,最佳实践是什么?我不确定是否Mono<ServerResponse>不违反这些反应性原则。

我可能完全错了,或者遗漏了一些重要的东西。谢谢你的帮助!

4

1 回答 1

2

这完全取决于使用ServerResponse. 根据 WebFlux 文档(https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html#spring-webflux)设置处理函数返回Mono<ServerResponse>,不管返回项目的数量是标准方式并且绝对没问题- 只要客户端正确处理底层Flux<T>一切都很好。我的问题出现了,因为我使用 测试了端点curl,它无法检测到底层的Flux. 使用启用功能样式的客户端(如org.springframework.web.reactive.function.client.WebClient),Mono<ServerResponse>可以将其反序列化为Flux<T>第一个,启用所有漂亮的反应功能,并使我们的onNext()事件显示出来。

客户端代码

像这样调用后端,将 ServerResponse 反序列化为 Flux:

@GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
    return  webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
            .retrieve()
            .bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
            .log("GET all items from server");
}

将导致看到所有onNext()事件,启用客户端事件处理:

2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : request(unbounded)
2020-05-10 16:10:10.511  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
2020-05-10 16:10:10.513  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onComplete()

因此,只要对响应进行正确的客户端处理,一切都会很好并且完全反应。

于 2020-05-10T14:27:55.250 回答