我在下面的代码中得到了一个 java.nio.channels.NotYetConnectedException,因为我正在尝试写入一个尚未打开的通道。
本质上,我拥有的是一个频道池,如果一个频道空闲,我会在其中抓取一个要写入的频道,如果一个频道不可用,我会创建一个新频道。我的问题是,当我创建一个新通道时,当我调用connect时,通道还没有准备好写入,我不想等待连接打开后再返回,因为我不想阻塞线程。最好的方法是什么?另外,我检索/返回频道的逻辑是否有效?请参阅下面的代码。
我有一个简单的连接池,如下所示:
private static class ChannelPool {
private final ClientBootstrap cb;
private Set<Channel> activeChannels = new HashSet<Channel>();
private Deque<Channel> freeChannels = new ArrayDeque<Channel>();
public ChannelPool() {
ChannelFactory clientFactory =
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
cb = new ClientBootstrap(clientFactory);
cb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new HttpRequestEncoder(),
new HttpResponseDecoder(),
new ResponseHandler());
}
});
}
private Channel newChannel() {
ChannelFuture cf;
synchronized (cb) {
cf = cb.connect(new InetSocketAddress("localhost", 18080));
}
final Channel ret = cf.getChannel();
ret.getCloseFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture arg0) throws Exception {
System.out.println("channel closed?");
synchronized (activeChannels) {
activeChannels.remove(ret);
}
}
});
synchronized (activeChannels) {
activeChannels.add(ret);
}
System.out.println("returning new channel");
return ret;
}
public Channel getFreeChannel() {
synchronized (freeChannels) {
while (!freeChannels.isEmpty()) {
Channel ch = freeChannels.pollFirst();
if (ch.isOpen()) {
return ch;
}
}
}
return newChannel();
}
public void returnChannel(Channel ch) {
synchronized (freeChannels) {
freeChannels.addLast(ch);
}
}
}
我正在尝试在处理程序中使用它,如下所示:
private static class RequestHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
final HttpRequest request = (HttpRequest) e.getMessage();
Channel proxyChannel = pool.getFreeChannel();
proxyToClient.put(proxyChannel, e.getChannel());
proxyChannel.write(request);
}
}