我想实现接收到的内容的部分刷新。例如,我有处理程序:
return client
.post()
.body(BodyInserters.fromDataBuffers(
request.body(BodyExtractors.toDataBuffers())))
.exchange()
.....
收到一定数量的数据缓冲区时如何强制刷新?
我想实现接收到的内容的部分刷新。例如,我有处理程序:
return client
.post()
.body(BodyInserters.fromDataBuffers(
request.body(BodyExtractors.toDataBuffers())))
.exchange()
.....
收到一定数量的数据缓冲区时如何强制刷新?
首先,对此提出警告:
默认情况下(即如果您不手动刷新),Netty 将缓冲字节并不时刷新它们,只要通道准备好并且刷新策略认为它很方便。这是针对性能进行优化的。
如果您希望手动刷新,它不能保证对方将以相同的方式接收这些字节组;中间人可能会在此过程中缓冲一些事情。这就是这可能无法实现您想要做的事情的地方:手动刷新通常不是关于性能优化,而是协议语义。
仅当您将其与协议语义(如消息分隔符)配对时,使用手动刷新策略才有用,以便对方知道如何拆分消息(这就是 Spring WebFlux 为 SSE 和application/streaming+json
.
现在为了实现这一点,Reactor 提供了几个windowXYZ
具有不同策略的操作符。Flux.window(int)
是基于元素的数量,windowTimeout(Duration)
基于持续时间等。在这种情况下,您可能需要使用windowUntil(Predicate)
.
让我们尝试实现在缓冲一定数量的数据时刷新的东西。
Flux<DataBuffer> buffers = //...;
int maxSize = //...;
AtomicInteger currentSize = new AtomicInteger(0);
Flux<Flux<DataBuffer>> bufferWindow = buffers.windowUntil(buf -> {
if (currentSize.addAndGet(buf.readableByteCount()) < maxSize) {
return false;
}
currentSize.set(0);
return true;
});
WebClient.create()
.post()
.body((outputMessage, context) -> outputMessage.writeAndFlushWith(bufferWindow))
.retrieve();
请注意,如果您对无限数据流进行操作,则实施存在缺陷:在达到配额或源完成之前,这不会刷新。因此,这可能会保留数据超过必要的时间。