2

我最近开始尝试使用 Netty。并想出了这样的代码。我要实现一个 HttpClient 但我有一些限制:我们的系统中只允许一个连接。因此,客户端应该仅在旧连接已死时才创建新连接。但是,我还没有设法通过一个渠道两次发送和获取响应/请求。

我正在使用 Netty - 4.0.2.Final

这是我的例子:

object HttpClient {

  private val group: EventLoopGroup = new NioEventLoopGroup()
  private val uri = new URI("http://www.google.at/")

  var closed = false

  def main(args: Array[String]): Unit = run

  def run: Unit = {
    try {
      val req: HttpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath)
      req.headers().set(HttpHeaders.Names.HOST, uri.getHost)
      req.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
      req.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)

      val cl = new Bootstrap()

      cl.group(group).channel(classOf[NioSocketChannel]).handler(new ChannelInitializer[SocketChannel] {

        override def initChannel(ch: SocketChannel): Unit = {
          val p: ChannelPipeline = ch.pipeline
          p.addLast("log", new LoggingHandler(LogLevel.INFO))
          p.addLast("codec", new HttpClientCodec())
          p.addLast("gzip", new HttpContentDecompressor())
          p.addLast("aggregator", new HttpObjectAggregator(1048576))
          p.addLast("handler", new SimpleChannelInboundHandler[HttpObject] {

            override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject): Unit = {
              if (msg.isInstanceOf[HttpResponse]) {
                val resp = msg.asInstanceOf[HttpResponse]
                println(s"STATUS: ${resp.getStatus} - ${ctx.channel}")
                println(s"VERSION: ${resp.getProtocolVersion} - ${ctx.channel}")
              }

              if(!closed) {
                send(ctx.channel, req).addListener(new ChannelFutureListener(){
                  override def operationComplete(cf: ChannelFuture): Unit = {
                    println(s"it's supposed to be a second request : ${cf}")
                  }
                })
                closed = false
              } else {
                ctx.channel.close
              }

            }
            override def exceptionCaught(ctx: ChannelHandlerContext, th: Throwable): Unit = {
              th.printStackTrace
              ctx.close
            }  
          })

        }

      })

      val chF  = cl.connect(uri.getHost, 80).awaitUninterruptibly()

      chF.addListener(new ChannelFutureListener() {
        override def operationComplete(cf: ChannelFuture): Unit = {
          println(s"STATUS: ${cf} ")
        }
      })
      val ch = chF.channel

      send(ch, req).addListener(new ChannelFutureListener(){
        override def operationComplete(cf: ChannelFuture): Unit = {
          println(s"it's a first request : ${cf}")
        }
      })

      if(ch.isActive) println("Channel is still active")
      if(ch.isOpen) println("Channel is still open")
      if(ch.isWritable) println("Channel is still writable")

      ch.closeFuture.addListener(new ChannelFutureListener(){
        override def operationComplete(cf: ChannelFuture): Unit = {
          println(s"Channel: $cf is closed")
        }
      }).sync

    } finally {
      group.shutdownGracefully
    }
  }

  def send(ch: Channel, r: HttpRequest): ChannelFuture = ch.writeAndFlush(r)

 }

执行后我得到这样的输出:

STATUS: DefaultChannelPromise@3f6a99fd(success) 
Channel is still active
Channel is still open
Channel is still writable
it's a first request : DefaultChannelPromise@4d1a802(success)
STATUS: 200 OK - [id: 0x8e780d38, /10.0.0.8:39594 => www.google.at/173.194.113.95:80]
VERSION: HTTP/1.1 - [id: 0x8e780d38, /10.0.0.8:39594 => www.google.at/173.194.113.95:80]
it's supposed to be a second request : DefaultChannelPromise@7b88f65(failure(java.lang.ArrayIndexOutOfBoundsException: -1)
Channel: AbstractChannel$CloseFuture@3acefb4d(success) is closed

和例外:

java.lang.ArrayIndexOutOfBoundsException: -1
    at java.util.ArrayList.elementData(ArrayList.java:371)
    at java.util.ArrayList.get(ArrayList.java:384)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:114)
    at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:193)
    at io.netty.channel.DefaultChannelHandlerContext.invokeWrite0(DefaultChannelHandlerContext.java:698)
    at io.netty.channel.DefaultChannelHandlerContext.access$1700(DefaultChannelHandlerContext.java:27)
    at io.netty.channel.DefaultChannelHandlerContext$18.run(DefaultChannelHandlerContext.java:689)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:353)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:366)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
    at java.lang.Thread.run(Thread.java:724)

我希望得到我的第二个请求的响应,但似乎我什至没有设法发送它。

请问有人可以提出任何建议吗?谢谢。

4

1 回答 1

0

通过升级到 Netty 4.0.6.Final 并使用DefaultFullHttpRequest而不是DefaultHttpRequest解决了这个问题。

于 2013-08-05T17:04:51.237 回答