我最近开始尝试使用 Netty。并想出了这样的代码。我要实现一个 HttpClient 但我有一些限制:我们的系统中只允许一个连接。因此,客户端应该仅在旧连接已死时才创建新连接。但是,我还没有设法通过一个渠道两次发送和获取响应/请求。
我正在使用 Netty - 4.0.2.Final
这是我的例子:
object HttpClient {
private val group: EventLoopGroup = new NioEventLoopGroup()
private val uri = new URI("http://www.google.at/")
var closed = false
def main(args: Array[String]): Unit = run
def run: Unit = {
try {
val req: HttpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath)
req.headers().set(HttpHeaders.Names.HOST, uri.getHost)
req.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
req.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
val cl = new Bootstrap()
cl.group(group).channel(classOf[NioSocketChannel]).handler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
val p: ChannelPipeline = ch.pipeline
p.addLast("log", new LoggingHandler(LogLevel.INFO))
p.addLast("codec", new HttpClientCodec())
p.addLast("gzip", new HttpContentDecompressor())
p.addLast("aggregator", new HttpObjectAggregator(1048576))
p.addLast("handler", new SimpleChannelInboundHandler[HttpObject] {
override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject): Unit = {
if (msg.isInstanceOf[HttpResponse]) {
val resp = msg.asInstanceOf[HttpResponse]
println(s"STATUS: ${resp.getStatus} - ${ctx.channel}")
println(s"VERSION: ${resp.getProtocolVersion} - ${ctx.channel}")
}
if(!closed) {
send(ctx.channel, req).addListener(new ChannelFutureListener(){
override def operationComplete(cf: ChannelFuture): Unit = {
println(s"it's supposed to be a second request : ${cf}")
}
})
closed = false
} else {
ctx.channel.close
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, th: Throwable): Unit = {
th.printStackTrace
ctx.close
}
})
}
})
val chF = cl.connect(uri.getHost, 80).awaitUninterruptibly()
chF.addListener(new ChannelFutureListener() {
override def operationComplete(cf: ChannelFuture): Unit = {
println(s"STATUS: ${cf} ")
}
})
val ch = chF.channel
send(ch, req).addListener(new ChannelFutureListener(){
override def operationComplete(cf: ChannelFuture): Unit = {
println(s"it's a first request : ${cf}")
}
})
if(ch.isActive) println("Channel is still active")
if(ch.isOpen) println("Channel is still open")
if(ch.isWritable) println("Channel is still writable")
ch.closeFuture.addListener(new ChannelFutureListener(){
override def operationComplete(cf: ChannelFuture): Unit = {
println(s"Channel: $cf is closed")
}
}).sync
} finally {
group.shutdownGracefully
}
}
def send(ch: Channel, r: HttpRequest): ChannelFuture = ch.writeAndFlush(r)
}
执行后我得到这样的输出:
STATUS: DefaultChannelPromise@3f6a99fd(success)
Channel is still active
Channel is still open
Channel is still writable
it's a first request : DefaultChannelPromise@4d1a802(success)
STATUS: 200 OK - [id: 0x8e780d38, /10.0.0.8:39594 => www.google.at/173.194.113.95:80]
VERSION: HTTP/1.1 - [id: 0x8e780d38, /10.0.0.8:39594 => www.google.at/173.194.113.95:80]
it's supposed to be a second request : DefaultChannelPromise@7b88f65(failure(java.lang.ArrayIndexOutOfBoundsException: -1)
Channel: AbstractChannel$CloseFuture@3acefb4d(success) is closed
和例外:
java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:371)
at java.util.ArrayList.get(ArrayList.java:384)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:114)
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:193)
at io.netty.channel.DefaultChannelHandlerContext.invokeWrite0(DefaultChannelHandlerContext.java:698)
at io.netty.channel.DefaultChannelHandlerContext.access$1700(DefaultChannelHandlerContext.java:27)
at io.netty.channel.DefaultChannelHandlerContext$18.run(DefaultChannelHandlerContext.java:689)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:353)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:366)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:724)
我希望得到我的第二个请求的响应,但似乎我什至没有设法发送它。
请问有人可以提出任何建议吗?谢谢。