0

我想实现接收到的内容的部分刷新。例如,我有处理程序:

return client
    .post()
    .body(BodyInserters.fromDataBuffers(
        request.body(BodyExtractors.toDataBuffers())))
    .exchange()
    .....

收到一定数量的数据缓冲区时如何强制刷新?

4

1 回答 1

0

首先,对此提出警告:

  • 默认情况下(即如果您不手动刷新),Netty 将缓冲字节并不时刷新它们,只要通道准备好并且刷新策略认为它很方便。这是针对性能进行优化的。

  • 如果您希望手动刷新,它不能保证对方将以相同的方式接收这些字节组;中间人可能会在此过程中缓冲一些事情。这就是这可能无法实现您想要做的事情的地方:手动刷新通常不是关于性能优化,而是协议语义。

  • 仅当您将其与协议语义(如消息分隔符)配对时,使用手动刷新策略才有用,以便对方知道如何拆分消息(这就是 Spring WebFlux 为 SSE 和application/streaming+json.

现在为了实现这一点,Reactor 提供了几个windowXYZ具有不同策略的操作符。Flux.window(int)是基于元素的数量,windowTimeout(Duration)基于持续时间等。在这种情况下,您可能需要使用windowUntil(Predicate).

让我们尝试实现在缓冲一定数量的数据时刷新的东西。

Flux<DataBuffer> buffers = //...;

int maxSize = //...;
AtomicInteger currentSize = new AtomicInteger(0);
Flux<Flux<DataBuffer>> bufferWindow = buffers.windowUntil(buf -> {
    if (currentSize.addAndGet(buf.readableByteCount()) < maxSize) {
        return false;
    }
    currentSize.set(0);
    return true;
});

WebClient.create()
        .post()
        .body((outputMessage, context) -> outputMessage.writeAndFlushWith(bufferWindow))
        .retrieve();

请注意,如果您对无限数据流进行操作,则实施存在缺陷:在达到配额或源完成之前,这不会刷新。因此,这可能会保留数据超过必要的时间。

于 2018-09-03T13:00:46.457 回答