5

我需要连接到 4 台机器并从套接字读取数据。我选择使用 nio2 的异步模型。

这是一个伪代码:

class Connector {

    private final AsynchronousChannelGroup group;
    private final String host;
    private final int port
    //...
    
    public void connect() {
        try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {

            client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);

            if (!group.isShutdown()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
            }

        } catch (Exception e) {
            //
        }
    }

    private class ReadHandler implements CompletionHandler<Integer, Attachement> {

        final AsynchronousSocketChannel channel;

        @Override
        public void completed(Integer result, Attachement attachment) {
            attachment.buffer.clear();
            channel.read(attachment.buffer, 5, TimeUnit.SECONDS, attachment, this);
        }

        @Override
        public void failed(Throwable exc, Attachement attachment) {
            closeChannel();
        }

        void closeChannel() {
            try {
                if (channel.isOpen()) {
                    channel.close();
                }
            } catch (IOException e) {
                // do something else
            }
        }
    }
}

要运行此代码,我创建一个新组并将其传递给所有Connectors :

ExecutorService executor = Executors.newFixedThreadPool(4);
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executor);

//for each server
Connector connectorX = new Connector(group, serverX.host, serverX.port);

问题:

线程在第一次阅读后发布(您可以在其中找到//#1评论)。

解决方案 !

为了解决这个问题,我引入了一个CountDownLatch让线程等待读取:


//...
private CountDownLatch latch; 

public void connect() {

        latch = new CountDownLatch(1);

        try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {

            client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);

            if (!group.isShutdown()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
            }
            
            latch.await();

        } catch (Exception e) {
            //...
            latch.countDown();
        }
}

问题:

这是 ( CountDownLatch) 解决问题的正确方法吗?如果没有最好的方法是什么?

阻塞线程(latch.await())是不是与异步模型相矛盾?

4

0 回答 0