我在 netty 中看到了很多关于分块流的问题,但其中大部分是关于出站流的解决方案,而不是入站流。
我想了解如何从通道获取数据并将其作为 InputStream 发送到我的业务逻辑,而无需先将所有数据加载到内存中。这是我想做的事情:
public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {
private HttpServletRequest request;
private PipedOutputStream os;
private PipedInputStream is;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
this.os = new PipedOutputStream();
this.is = new PipedInputStream(os);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
this.os.close();
this.is.close();
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
if (msg instanceof HttpRequest) {
this.request = new CustomHttpRequest((HttpRequest) msg, this.is);
out.add(this.request);
}
if (msg instanceof HttpContent) {
ByteBuf body = ((HttpContent) msg).content();
if (body.readableBytes() > 0)
body.readBytes(os, body.readableBytes());
if (msg instanceof LastHttpContent) {
os.close();
}
}
}
}
然后我有另一个处理程序,它将获取我的 CustomHttpRequest 并发送到我称之为 ServiceHandler 的东西,我的业务逻辑将从 InputStream 中读取。
public class ServiceRouterHandler extends SimpleChannelInboundHandler<CustomHttpRequest> {
...
@Override
public void channelRead0(ChannelHandlerContext ctx, CustomHttpRequest request) throws IOException {
...
future = serviceHandler.handle(request, response);
...
这不起作用,因为当我的 Handler 将 CustomHttpRequest 转发给 ServiceHandler 并尝试从 InputStream 中读取时,线程被阻塞,并且我的解码器中永远不会处理 HttpContent。
我知道我可以尝试为我的业务逻辑创建一个单独的线程,但我觉得我在这里把事情复杂化了。
我查看了 ByteBufInputStream,但它说
请注意,它最多只能读取构建时确定的可读字节数。
所以我认为它不适用于 Chunked Http 请求。另外,我看到了 ChunkedWriteHandler,这对于 Oubound 块来说似乎很好,但我找不到 ChunkedReadHandler 之类的东西......
所以我的问题是:最好的方法是什么?我的要求是:
- 在发送 ServiceHandlers 之前不要将数据保存在内存中;
- ServiceHandlers API 应该与 netty 无关(这就是为什么我使用我的 CustomHttpRequest,而不是 Netty 的 HttpRequest);
更新
我已经在 CustomHttpRequest 上使用更具反应性的方法来实现这一点。现在,请求没有向 ServiceHandlers 提供 InputStream 以便它们可以读取(这是阻塞的),但是 CustomHttpRequest 现在有一个readInto(OutputStream)
返回 Future 的方法,并且所有服务处理程序都将在这个 Outputstream 被填满时执行. 这是它的样子
public class CustomHttpRequest {
...constructors and other methods hidden...
private final SettableFuture<Void> writeCompleteFuture = SettableFuture.create();
private final SettableFuture<OutputStream> outputStreamFuture = SettableFuture.create();
private ListenableFuture<Void> lastWriteFuture = Futures.transform(outputStreamFuture, x-> null);
public ListenableFuture<Void> readInto(OutputStream os) throws IOException {
outputStreamFuture.set(os);
return this.writeCompleteFuture;
}
ListenableFuture<Void> writeChunk(byte[] buf) {
this.lastWriteFuture = Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) (os) -> {
outputStreamFuture.get().write(buf);
return Futures.immediateFuture(null);
});
return lastWriteFuture;
}
void complete() {
ListenableFuture<Void> future =
Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) x -> {
outputStreamFuture.get().close();
return Futures.immediateFuture(null);
});
addFinallyCallback(future, () -> {
this.writeCompleteFuture.set(null);
});
}
}
我更新后的 ServletRequestHandler 如下所示:
public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {
private NettyHttpServletRequestAdaptor request;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
this.request = new CustomHttpRequest(request, ctx.channel());
out.add(this.request);
}
if (msg instanceof HttpContent) {
ByteBuf buf = ((HttpContent) msg).content();
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
this.request.writeChunk(bytes);
if (msg instanceof LastHttpContent) {
this.request.complete();
}
}
}
}
这工作得很好,但是请注意,这里的所有内容都是在一个线程中完成的,也许对于大数据,我可能想要生成一个新线程来为其他通道释放该线程。