我是 Spring Reactor 和 WebFlux 的新手,对 Spring 功能 web 中的事件流有点困惑。示例:我有一个处理函数返回一个Mono<ServerResponse>
. 在其中,一个findAll()
存储库方法被执行,返回一个Flux<T>
. 为了遵守响应式宣言,为了实现异步、非阻塞并允许背压,我希望看到onNext()
从存储库返回的每个元素都有一个。但是,在请求处理期间查看服务器日志,我只看到一个onNext()
事件,这是有道理的,因为我的返回类型是Mono
包含响应的:
路由器功能
@Bean
public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
return RouterFunctions
.route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
, itemsHandler::getAll);
}
处理函数
Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(itemRepository.findAll(), Item.class)
.log("GET items");
}
事件日志
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | request(unbounded)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745 INFO 19096 --- [ctor-http-nio-4] GET items : | onComplete()
相比之下,实现一个带有Flux<T>
as 返回类型的经典 Spring 注释控制器方法,我将看到一个onNext()
for 每个实例T
(即结果集的每个项目),这对我来说看起来更“正确”(客户端现在可以控制事件流量等):
控制器
@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
return itemRepository
.findAll()
.log("GET items");
}
日志
2020-05-10 15:14:04.135 INFO 19096 --- [ctor-http-nio-5] GET items : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136 INFO 19096 --- [ctor-http-nio-5] GET items : request(unbounded)
2020-05-10 15:14:04.137 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onComplete()
这令人困惑。让我详细说明:
- 从某种意义上说,使用
Mono<ServerResponse>
似乎是邪恶的,因为它将整个结果集封装在一个事件中,对我来说,这感觉就像打破了异步、非阻塞、启用背压的事件流的反应性原则。这不会从客户手中夺走控制权吗?对我来说,这看起来像是传统的、阻塞的客户端/服务器通信。 - 直接返回
Flux<T>
感觉好多了,因为它支持按结果事件处理和背压控制。
我的问题是:
- 创建一个 有什么影响
Mono<ServerResponse>
?这是否会导致阻塞、同步交互,onNext()
仅在从 repo 中读取所有项目时才发出?我会失去背压功能等吗? - 如何让功能样式后端为
onNext()
结果集中的每个项目发送一个? - 就完全响应式(即非阻塞、异步和背压兼容)的函数式处理函数的返回类型而言,最佳实践是什么?我不确定是否
Mono<ServerResponse>
不违反这些反应性原则。
我可能完全错了,或者遗漏了一些重要的东西。谢谢你的帮助!