我用 Java 构建了一个基于 Selector 的系统,它可以接受多个客户端。它在 OP_ACCEPT 下注册了一个 ServerSocketChannel,它接受()传入的连接并再次向选择器注册生成的 SocketChannel。这是那一点:
ServerSocketChannel insock = ServerSocketChannel.open();
insock.configureBlocking(false);
insock.socket().bind(new InetSocketAddress(6789));
Selector sel = Selector.open();
SelectionKey joinchannel = insock.register(sel, SelectionKey.OP_ACCEPT);
System.out.println("Ready to accept incoming connections.");
while (true) {
int ready = sel.selectNow();
if (ready == 0)
continue;
Set<SelectionKey> selectedKeys = sel.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()){
SocketChannel newConnection = insock.accept();
System.out.println("New client "+newConnection+" connected.");
newConnection.configureBlocking(false);
newConnection.register(sel, SelectionKey.OP_READ).attach(new DGPlayer());
}
如果我为 OP_READ 注册新的 SocketChannel,则可以正常工作。isReadable() 检查成功,读取数据。这是那一点:
else if(key.isReadable()){
ByteBuffer buf = ByteBuffer.allocate(1024);
int trans = ((SocketChannel)key.channel()).read(buf); buf.flip();
byte[] ba = new byte[buf.remaining()]; buf.get(ba);
String msg = new String(ba, 0, ba.length, Charset.forName("UTF-8"));
if(trans > 0){
DGPlayer client = (DGPlayer) key.attachment();
System.out.println(client.name+": "+msg.trim());
}
}
// else if(key.isWritable()){
// ByteBuffer buf = ByteBuffer.allocate(48);
// buf.clear();
// buf.put(((String)key.attachment()).getBytes());
// buf.flip();
//
// SocketChannel target = ((SocketChannel)key.channel());
// while(buf.hasRemaining()) {
// target.write(buf);
// }
//
// key.attach(null);
// }
但是,如果我为 OP_READ|OP_WRITE 注册 SocketChannel,则什么也不会发生。据我所知,选择器永远不会找到可读()或可写()的通道。唯一的变化是注册频道以获得 WRITE 权限。
知道为什么会这样吗?
编辑 - 为了完成,这里有一些客户端代码:
while (true) {
int readyChannels = sel.select();
if (readyChannels == 0)
continue;
Set<SelectionKey> selectedKeys = sel.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
ByteBuffer buf = ByteBuffer.allocate(1024);
int trans = ((SocketChannel)key.channel()).read(buf); buf.flip();
byte[] ba = new byte[buf.remaining()]; buf.get(ba);
String msg = new String(ba, 0, ba.length, Charset.forName("UTF-8"));
if(msg.trim().length() != 0)
System.out.println("Response from server: "+msg.trim());
}
else if(key.isWritable() && messages.size() > 0){
String message = messages.remove();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(message.getBytes(Charset.forName("UTF-8")));
buf.flip();
int written = outsock.write(buf);
while(buf.hasRemaining()){
outsock.write(buf);
}
System.out.println("Wrote '"+message+"' to server");
}
keyIterator.remove();
}
}
(客户端为 OP_READ|OP_WRITE 注册服务器)