4

我不明白反应式网络客户端的工作原理。它说spring webclient是非阻塞客户端,但是这个webclient似乎在等待来自远程api的onComplete()信号,然后它可以处理从远程api发出的每个项目。当从目标 api 触发 onNext() 时,我期望 webclient 可以处理每个项目

我是春季 webflux 世界的新手。我读过它,它说它使用 netty 作为默认服务器。而这个网络使用事件循环。所以为了理解它是如何工作的,我尝试创建 2 个小应用程序,客户端和服务器。服务器应用程序仅返回简单的通量,每个项目延迟 1 秒。客户端应用程序使用 webclient 调用远程 api。

服务器:

@GetMapping(ITEM_END_POINT_V1)
public Flux<Item> getAllItems(){
        return Flux.just(new Item(null, "Samsung TV", 399.99),
                new Item(null, "LG TV", 329.99),
                new Item(null, "Apple Watch", 349.99),
                new Item("ABC", "Beats HeadPhones", 
      149.99)).delayElements(Duration.ofSeconds(1)).log("Item : ");
}

客户:

WebClient webClient = WebClient.create("http://localhost:8080");

@GetMapping("/client/retrieve")
public Flux<Item> getAllItemsUsingRetrieve() {
        return webClient.get().uri("/v1/items")
                .retrieve()
                .bodyToFlux(Item.class).log();
}

从服务器登录:

2019-05-01 22:44:20.121  INFO 19644 --- [ctor-http-nio-2] Item :                                   : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-05-01 22:44:20.122  INFO 19644 --- [ctor-http-nio-2] Item :                                   : request(unbounded)
2019-05-01 22:44:21.126  INFO 19644 --- [     parallel-1] Item :                                   : onNext(Item(id=null, description=Samsung TV, price=399.99))
2019-05-01 22:44:22.129  INFO 19644 --- [     parallel-2] Item :                                   : onNext(Item(id=null, description=LG TV, price=329.99))
2019-05-01 22:44:23.130  INFO 19644 --- [     parallel-3] Item :                                   : onNext(Item(id=null, description=Apple Watch, price=349.99))
2019-05-01 22:44:24.131  INFO 19644 --- [     parallel-4] Item :                                   : onNext(Item(id=ABC, description=Beats HeadPhones, price=149.99))
2019-05-01 22:44:24.132  INFO 19644 --- [     parallel-4] Item :                                   : onComplete()

来自客户端的日志:

2019-05-01 22:44:19.934  INFO 24164 --- [ctor-http-nio-2] reactor.Flux.MonoFlatMapMany.1           : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2019-05-01 22:44:19.936  INFO 24164 --- [ctor-http-nio-2] reactor.Flux.MonoFlatMapMany.1           : request(unbounded)
2019-05-01 22:44:19.940 TRACE 24164 --- [ctor-http-nio-2] o.s.w.r.f.client.ExchangeFunctions       : [7e73de5c] HTTP GET http://localhost:8080/v1/items, headers={}
2019-05-01 22:44:24.159 TRACE 24164 --- [ctor-http-nio-6] o.s.w.r.f.client.ExchangeFunctions       : [7e73de5c] Response 200 OK, headers={masked}
2019-05-01 22:44:24.204  INFO 24164 --- [ctor-http-nio-6] reactor.Flux.MonoFlatMapMany.1           : onNext(Item(id=null, description=Samsung TV, price=399.99))
2019-05-01 22:44:24.204  INFO 24164 --- [ctor-http-nio-6] reactor.Flux.MonoFlatMapMany.1           : onNext(Item(id=null, description=LG TV, price=329.99))
2019-05-01 22:44:24.204  INFO 24164 --- [ctor-http-nio-6] reactor.Flux.MonoFlatMapMany.1           : onNext(Item(id=null, description=Apple Watch, price=349.99))
2019-05-01 22:44:24.204  INFO 24164 --- [ctor-http-nio-6] reactor.Flux.MonoFlatMapMany.1           : onNext(Item(id=ABC, description=Beats HeadPhones, price=149.99))
2019-05-01 22:44:24.205  INFO 24164 --- [ctor-http-nio-6] reactor.Flux.MonoFlatMapMany.1           : onComplete()

我希望客户端不会等待 4 秒然后得到实际结果。如您所见,服务器在 22:44:21.126 开始发出 onNext() ,客户端在22:44:24.159获得结果。所以我不明白如果 webclient 有这种行为,为什么它被称为非阻塞客户端。

4

1 回答 1

6

WebClient 是非阻塞的,从某种意义上说,通过 WebClient 发送 HTTP 请求的线程不会被 IO 操作阻塞。当响应可用时,netty 将通知其中一个工作线程,它将根据您定义的反应流操作处理响应。

在您的示例中,服务器将等待 Flux 中的所有元素都可用(4 秒),将它们序列化为 JSON 数组,然后在单个 HTTP 响应中将其发送回。

客户端等待这个单一的响应,但在此期间没有它的线程被阻塞。

如果要实现流式传输效果,则需要利用不同的内容类型或 WebSockets 等底层协议。查看以下关于application/stream+json内容类型的 SO 线程: Spring WebFlux Flux behavior with non streaming application/json

于 2019-05-08T13:39:35.153 回答