基于“helloworld”HTTP 示例(Netty 4.0.12.Final),我想以某种方式推迟编写 HTTP 响应,即请求处理和响应创建应该在 Netty 的工作人员之外执行。响应可用后,Netty 应该从它离开的地方拿起并将响应写入通道。
以下代码应该演示我想要实现的目标。问题是,如果 handleDeferred() Netty 在某处阻塞并且响应永远不会到达客户端。我不确定我是否完全偏离了轨道。因此,任何建议都值得赞赏。
import io.netty.channel.{ChannelFutureListener, ChannelHandlerContext, ChannelInboundHandlerAdapter}
import io.netty.handler.codec.http._
import io.netty.handler.codec.http.HttpHeaders.Values
import io.netty.buffer.Unpooled
import scala.concurrent._
class DispatchingHandler extends ChannelInboundHandlerAdapter {
class StaticResponseFactory {
def response = {
val content = Unpooled.wrappedBuffer("Hey there!".getBytes)
val response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content)
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain")
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes())
response
}
}
override def channelRead(ctx: ChannelHandlerContext, msg: Any) {
val deferred = true // <= select how to respond.
msg match {
case request: HttpRequest if deferred => handleDeferred(ctx, request)
case request: HttpRequest => handleImmediately(ctx, request)
case lastHttpContent: LastHttpContent => /* ignore trailing headers */
case other => println("unexpected message: " + other)
}
}
override def channelReadComplete(ctx: ChannelHandlerContext) {
println("[" + Thread.currentThread() + "] flushing...")
ctx.flush()
}
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
cause.printStackTrace()
}
def handleImmediately(ctx: ChannelHandlerContext, request: HttpRequest) {
val response = new StaticResponseFactory().response
writeResponse(ctx, request, response)
}
def handleDeferred(ctx: ChannelHandlerContext, request: HttpRequest) {
// execute dispatch and request handling on Scala-managed thread pool (global).
import scala.concurrent.ExecutionContext.Implicits.global
future(dispatchRequestAndCollectResponse(request)).onSuccess { case factory =>
writeResponse(ctx, request, factory.response)
}
}
def writeResponse(ctx: ChannelHandlerContext, request: HttpRequest, response: FullHttpResponse) {
if (!HttpHeaders.isKeepAlive(request)) {
val channelFuture = ctx.write (response)
channelFuture.addListener (ChannelFutureListener.CLOSE)
} else {
response.headers ().set (HttpHeaders.Names.CONNECTION, Values.KEEP_ALIVE)
ctx.write (response)
}
}
def dispatchRequestAndCollectResponse(request: HttpRequest): StaticResponseFactory = {
// do some heavy lifting...
Thread.sleep(1000)
new StaticResponseFactory
}
}