0

我在下面的代码中得到了一个 java.nio.channels.NotYetConnectedException,因为我正在尝试写入一个尚未打开的通道。

本质上,我拥有的是一个频道池,如果一个频道空闲,我会在其中抓取一个要写入的频道,如果一个频道不可用,我会创建一个新频道。我的问题是,当我创建一个新通道时,当我调用connect时,通道还没有准备好写入,我不想等待连接打开后再返回,因为我不想阻塞线程。最好的方法是什么?另外,我检索/返回频道的逻辑是否有效?请参阅下面的代码。

我有一个简单的连接池,如下所示:

private static class ChannelPool {
    private final ClientBootstrap cb;
    private Set<Channel> activeChannels = new HashSet<Channel>();
    private Deque<Channel> freeChannels = new ArrayDeque<Channel>();
    public ChannelPool() {
        ChannelFactory clientFactory =
                new NioClientSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool());
        cb = new ClientBootstrap(clientFactory);
        cb.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(
                        new HttpRequestEncoder(),
                        new HttpResponseDecoder(),
                        new ResponseHandler());
            }
        });
    }

    private Channel newChannel() {
        ChannelFuture cf;
        synchronized (cb) {
            cf = cb.connect(new InetSocketAddress("localhost", 18080));
        }
        final Channel ret = cf.getChannel();
        ret.getCloseFuture().addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture arg0) throws Exception {
                System.out.println("channel closed?");
                synchronized (activeChannels) {
                    activeChannels.remove(ret);
                }
            }
        });
        synchronized (activeChannels) {
            activeChannels.add(ret);
        }
        System.out.println("returning new channel");
        return ret;
    }

    public Channel getFreeChannel() {
        synchronized (freeChannels) {
            while (!freeChannels.isEmpty()) {
                Channel ch = freeChannels.pollFirst();
                if (ch.isOpen()) {
                    return ch;
                }
            }
        }
        return newChannel();
    }

    public void returnChannel(Channel ch) {
        synchronized (freeChannels) {
            freeChannels.addLast(ch);
        }
    }
}

我正在尝试在处理程序中使用它,如下所示:

private static class RequestHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
        final HttpRequest request = (HttpRequest) e.getMessage();
        Channel proxyChannel = pool.getFreeChannel();
        proxyToClient.put(proxyChannel, e.getChannel());
        proxyChannel.write(request);
    }
}
4

1 回答 1

2

不必立即将新通道添加到activeChannels之后bootstrap.connect(..),您必须将侦听器添加到ChannelFuture由 返回的bootstrap.connect(..),然后将通道添加到activeChannels添加的侦听器中。这样,getFreeChannel()将永远不会获得尚未连接的通道。

activeChannels因为即使你调用它也可能是空的newChannel()newChannel()甚至在连接建立之前就会返回),你必须决定在这种情况下该怎么做。如果我是你,我会将 from 的返回类型更改为getFreeChannel()to ChannelChannelFuture以便在免费频道准备好时通知调用者。

于 2013-04-04T07:54:22.977 回答