1

基于“helloworld”HTTP 示例(Netty 4.0.12.Final),我想以某种方式推迟编写 HTTP 响应,即请求处理和响应创建应该在 Netty 的工作人员之外执行。响应可用后,Netty 应该从它离开的地方拿起并将响应写入通道。

以下代码应该演示我想要实现的目标。问题是,如果 handleDeferred() Netty 在某处阻塞并且响应永远不会到达客户端。我不确定我是否完全偏离了轨道。因此,任何建议都值得赞赏。

import io.netty.channel.{ChannelFutureListener, ChannelHandlerContext, ChannelInboundHandlerAdapter}
import io.netty.handler.codec.http._
import io.netty.handler.codec.http.HttpHeaders.Values
import io.netty.buffer.Unpooled
import scala.concurrent._

class DispatchingHandler extends ChannelInboundHandlerAdapter {

  class StaticResponseFactory {
    def response = {
      val content = Unpooled.wrappedBuffer("Hey there!".getBytes)
      val response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content)
      response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain")
      response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes())
      response
    }
  }

  override def channelRead(ctx: ChannelHandlerContext, msg: Any) {
    val deferred = true // <= select how to respond.
    msg match {
      case request: HttpRequest if deferred => handleDeferred(ctx, request)
      case request: HttpRequest => handleImmediately(ctx, request)
      case lastHttpContent: LastHttpContent => /* ignore trailing headers */
      case other => println("unexpected message: " + other)
    }
  }

  override def channelReadComplete(ctx: ChannelHandlerContext) {
    println("[" + Thread.currentThread() + "] flushing...")
    ctx.flush()
  }

  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
    cause.printStackTrace()
  }

  def handleImmediately(ctx: ChannelHandlerContext, request: HttpRequest) {
    val response = new StaticResponseFactory().response
    writeResponse(ctx, request, response)
  }

  def handleDeferred(ctx: ChannelHandlerContext, request: HttpRequest) {
    // execute dispatch and request handling on Scala-managed thread pool (global).
    import scala.concurrent.ExecutionContext.Implicits.global
    future(dispatchRequestAndCollectResponse(request)).onSuccess { case factory =>
      writeResponse(ctx, request, factory.response)
    }
  }

  def writeResponse(ctx: ChannelHandlerContext, request: HttpRequest, response: FullHttpResponse) {
    if (!HttpHeaders.isKeepAlive(request)) {
      val channelFuture = ctx.write (response)
      channelFuture.addListener (ChannelFutureListener.CLOSE)
    } else {
      response.headers ().set (HttpHeaders.Names.CONNECTION, Values.KEEP_ALIVE)
      ctx.write (response)
    }
  }

  def dispatchRequestAndCollectResponse(request: HttpRequest): StaticResponseFactory = {
    // do some heavy lifting...
    Thread.sleep(1000)
    new StaticResponseFactory
  }
}
4

0 回答 0