我们正在迁移到 Spring WebFlux(使用 reactor-netty)。应用程序使用带有 Spring 控制器的 HTTP 协议。目前我们有一个过渡解决方案,它可以将入站 IO 缓冲区累积到CompositeByteBuf
不复制(然后将其作为 处理InputStream
)。reactor-netty 为我们提供直接字节缓冲区。release()
因此,调用这些缓冲区至关重要。最初我们有代码:
public static Mono<CompositeByteBuf> collectToComposite(Publisher<DataBuffer> data) {
return Flux.from(data).reduce(
EMPTY,
(CompositeByteBuf acc, DataBuffer buffer) -> {
ByteBuf byteBuf = toByteBuf(buffer);
CompositeByteBuf composite = (acc == EMPTY) ? byteBuf.alloc().compositeBuffer(256) : acc;
composite.addComponent(true, byteBuf);
return composite;
}
).map(composite -> composite != EMPTY ? composite : createEmptyComposite());
}
并在处理得到的复合缓冲区之后放置释放。
Publisher
但是,如果上游信号错误,这种方法会导致泄漏。因此,在下一次尝试中,我们尝试使用以下方式处理错误并释放缓冲区(省略一些极端情况处理):
public static Mono<CompositeByteBuf> collectToComposite(Publisher<DataBuffer> data) {
// such code is not suitable for multiple subscribers
class CompositeHolder {
CompositeByteBuf composite;
void addComponent(ByteBuf component) {
if (composite == null) {
composite = component.alloc().compositeBuffer(256);
}
composite.addComponent(true, component);
}
}
CompositeHolder holder = new CompositeHolder();
return Flux.from(data)
.doOnNext(buffer -> holder.addComponent(toByteBuf(buffer)))
.doOnError(e -> holder.composite.release())
.then(Mono.fromSupplier(() -> holder.composite));
}
但在那之后我们意识到有必要在订阅取消时回收缓冲区(这发生在底层连接关闭时)。首先想到的是使用doOnCancel
操作符,但实际上并不能保证我们不能为doOnError
同doOnCancel
一个请求调用回调。因此,直接的解决方案需要我们明确检查缓冲区之前是否已释放。
现在我被困住了。我不知道如何处理此案并避免额外的复杂性。