1

我们正在迁移到 Spring WebFlux(使用 reactor-netty)。应用程序使用带有 Spring 控制器的 HTTP 协议。目前我们有一个过渡解决方案,它可以将入站 IO 缓冲区累积到CompositeByteBuf不复制(然后将其作为 处理InputStream)。reactor-netty 为我们提供直接字节缓冲区。release()因此,调用这些缓冲区至关重要。最初我们有代码:

public static Mono<CompositeByteBuf> collectToComposite(Publisher<DataBuffer> data) {
  return Flux.from(data).reduce(
      EMPTY,
      (CompositeByteBuf acc, DataBuffer buffer) -> {
        ByteBuf byteBuf = toByteBuf(buffer);
        CompositeByteBuf composite = (acc == EMPTY) ? byteBuf.alloc().compositeBuffer(256) : acc;
        composite.addComponent(true, byteBuf);
        return composite;
      }
  ).map(composite -> composite != EMPTY ? composite : createEmptyComposite());
}

并在处理得到的复合缓冲区之后放置释放。

Publisher但是,如果上游信号错误,这种方法会导致泄漏。因此,在下一次尝试中,我们尝试使用以下方式处理错误并释放缓冲区(省略一些极端情况处理):

public static Mono<CompositeByteBuf> collectToComposite(Publisher<DataBuffer> data) {
  // such code is not suitable for multiple subscribers
  class CompositeHolder {
    CompositeByteBuf composite;

    void addComponent(ByteBuf component) {
      if (composite == null) {
        composite = component.alloc().compositeBuffer(256);
      }
      composite.addComponent(true, component);
    }
  }
  CompositeHolder holder = new CompositeHolder();
  return Flux.from(data)
      .doOnNext(buffer -> holder.addComponent(toByteBuf(buffer)))
      .doOnError(e -> holder.composite.release())
      .then(Mono.fromSupplier(() -> holder.composite));
}

但在那之后我们意识到有必要在订阅取消时回收缓冲区(这发生在底层连接关闭时)。首先想到的是使用doOnCancel操作符,但实际上并不能保证我们不能为doOnErrordoOnCancel一个请求调用回调。因此,直接的解决方案需要我们明确检查缓冲区之前是否已释放。

现在我被困住了。我不知道如何处理此案并避免额外的复杂性。

4

1 回答 1

0

doFinally是一个 doOn 运算符,只要源错误、完成或订阅被取消,就会调用它。此外,它保证回调只执行一次(在错误+取消的情况下)。

您提供一个Consumer<SignalType>填充了SignalType导致回调调用的事件的事件。

于 2018-03-06T12:32:47.417 回答