我需要连接到 4 台机器并从套接字读取数据。我选择使用 nio2 的异步模型。
这是一个伪代码:
class Connector {
private final AsynchronousChannelGroup group;
private final String host;
private final int port
//...
public void connect() {
try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {
client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);
if (!group.isShutdown()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
}
} catch (Exception e) {
//
}
}
private class ReadHandler implements CompletionHandler<Integer, Attachement> {
final AsynchronousSocketChannel channel;
@Override
public void completed(Integer result, Attachement attachment) {
attachment.buffer.clear();
channel.read(attachment.buffer, 5, TimeUnit.SECONDS, attachment, this);
}
@Override
public void failed(Throwable exc, Attachement attachment) {
closeChannel();
}
void closeChannel() {
try {
if (channel.isOpen()) {
channel.close();
}
} catch (IOException e) {
// do something else
}
}
}
}
要运行此代码,我创建一个新组并将其传递给所有Connector
s :
ExecutorService executor = Executors.newFixedThreadPool(4);
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executor);
//for each server
Connector connectorX = new Connector(group, serverX.host, serverX.port);
问题:
线程在第一次阅读后发布(您可以在其中找到//#1
评论)。
解决方案 !
为了解决这个问题,我引入了一个CountDownLatch
让线程等待读取:
//...
private CountDownLatch latch;
public void connect() {
latch = new CountDownLatch(1);
try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {
client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);
if (!group.isShutdown()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
}
latch.await();
} catch (Exception e) {
//...
latch.countDown();
}
}
问题:
这是 ( CountDownLatch
) 解决问题的正确方法吗?如果没有最好的方法是什么?
阻塞线程(latch.await()
)是不是与异步模型相矛盾?