0

我是 NIO 和 NIO2 的新手。我一直在使用 Echo 服务器示例代码,它可以完美运行。我开始写一个类似的客户端。我的目标是让多个客户端套接字连接在一个主线程上运行。

我确实得到了 OP_CONNECT,然后选择器没有返回并从以下超时:while(Selector.select(10000)>0) { .... }

如果我忽略选择器并开始使用 socketChannel.read(...) 读取数据,我可以读取数据。因此,数据已准备好被读取,但我只是没有让 Selector.select(10000) 返回一些键。

这是完整的源代码,我真的很感激任何帮助:

package com.maker.webscraping.nio;

import java.io.IOException;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class EchoClient2 {
    private static final Logger logger = LoggerFactory.getLogger(EchoClient2.class);
    private static final Map<SocketChannel, EchoClient2> mapClients = new ConcurrentHashMap<>();

    private static final int DEFAULT_PORT = 5555;
    private static final String IP = "127.0.0.1";

    private final int clientID;
    private final ByteBuffer buffer;
    private final SocketChannel socketChannel;
    private final CharsetDecoder decoder;

    public int getClientID() {
        return clientID;
    }

    public ByteBuffer getBuffer() {
        return buffer;
    }

    public CharsetDecoder getDecoder() {
        return decoder;
    }

    //private static Selector selector = null;
    public static void main(String[] args) {
        Selector selector = null;
        try {
            selector = Selector.open();
            if (!selector.isOpen()) 
                throw new RuntimeException("Selector closed!");

            EchoClient2[] clients = new EchoClient2[2];

            clients[0] = new EchoClient2(0, selector);

            // wait for incomming events
            while (selector.select(10000)>0) {
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    SelectionKey key = (SelectionKey) keys.next();

                    try (SocketChannel socketChannel = (SocketChannel) key.channel()) {

                        if (key.isConnectable()) {
                            // connected
                            logger.info("Client:{} connected!", clients[0].getClientID());
                            key.interestOps(SelectionKey.OP_READ); // <-- desprete tries
                            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); // <-- desprete tries
                            logger.info("R:{}, W:{}, C:{}, validOps:{}", SelectionKey.OP_READ, SelectionKey.OP_WRITE, SelectionKey.OP_CONNECT,  socketChannel.validOps());
                            // close pending connections
                            if (socketChannel.isConnectionPending()) {
                                socketChannel.finishConnect();
                            }

                            read(key,selector); // <-- desprete tries
                            if (key.isReadable()) {
                                read(key,selector);
                            }// else if (key.isWritable()) {
                            //  this.writeOP(key);
                            //}

                        }
                    } catch (IOException e) {
                        logger.error("SocketChannel Exception!", e);
                    }
                }
            }
        } catch (IOException e) {
            logger.error("Selector IOException!", e);
        } finally {
            try {
                selector.close();
            } catch (IOException e) {}
        }       
    }

    public EchoClient2(int clientID, Selector selector) throws IOException {
        this.clientID = clientID;
        buffer = ByteBuffer.allocateDirect(2 * 1024);

        Charset charset = Charset.defaultCharset();
        decoder = charset.newDecoder();
//      if (selector==null)
//          selector = Selector.open();
        socketChannel = SocketChannel.open();
        if ((socketChannel.isOpen()) && (selector.isOpen())) {

            // configure non-blocking mode
            socketChannel.configureBlocking(false);
            // set some options
            socketChannel.setOption(StandardSocketOptions.SO_RCVBUF,
                    128 * 1024);
            socketChannel.setOption(StandardSocketOptions.SO_SNDBUF,
                    128 * 1024);
            socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,
                    true);

            socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            //socketChannel.register(selector, SelectionKey.OP_CONNECT);

            // connect to remote host
            socketChannel.connect(new java.net.InetSocketAddress(IP, DEFAULT_PORT));

            // add it to the map
            mapClients.put(socketChannel, this);
        } else 
            throw new RuntimeException("Channel or Selector closed!");
    }

    // isReadable returned true
    private static int read(SelectionKey key, Selector selector) {

        try {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            EchoClient2 client = mapClients.get(socketChannel);
            ByteBuffer buffer = client.getBuffer();

            buffer.clear();
            int numRead = -1;
            try {
                numRead = socketChannel.read(buffer);
            } catch (IOException e) {
                System.err.println("Cannot read error!");
            }

            if (numRead == -1) {
                mapClients.remove(socketChannel);
                System.out.println("Connection closed by: " + socketChannel.getRemoteAddress());
                socketChannel.close();
                key.cancel();
                return 1;
            }

            if (numRead == 0)
                throw new RuntimeException("numRead is 0!!!");

            buffer.flip();

            CharBuffer charBuffer = client.getDecoder().decode(buffer);
            System.out.println("server says:" + charBuffer.toString());

            if (buffer.hasRemaining()) {
                buffer.compact();
            } else {
                buffer.clear();
            }

            int r = new Random().nextInt(100);
            //if (r == 50) {
            //  System.out.println("50 was generated! Close the socket channel!");
            //  return 1;
            //} 

            ByteBuffer randomBuffer = ByteBuffer.wrap("Random number:".concat(String.valueOf(r))
                            .getBytes("UTF-8"));
            socketChannel.write(randomBuffer);
            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); // <-- desprete tries
        } catch (IOException e) {
            logger.error("IOException inside read!", e);

        } 
        return 0;
    }
}

谢谢,

4

1 回答 1

0
while (selector.select(10000)>0)

这段代码已经错了。它将在第一次发生选择超时时停止选择。它应该是:

while (selector.isOpen())
{
    if (selector.select(10000) > 0)
    {
        // ...
    }
}

还有其他问题。

  1. 如果finishConnect()返回,您应该只假设连接是完整的true.
  2. 连接完成时读取无效。你应该只在key.isReadable() 为真时阅读。您需要对此进行单独的测试,而不是在isConnectable()机箱上进行测试。
  3. 同样,您需要一个单独的案例isWritable().
  4. 在你有东西要写之前你不应该注册OP_WRITE,或者在你尝试写东西时得到零长度返回之后更好。OP_WRITE表示套接字发送缓冲区未满。这几乎总是正确的。
  5. 您需要在keys.remove()之后调用,keys.next(),否则您将一遍又一遍地获得相同的选定键。选择器不会清除选定键集。
  6. 您的评论// close pending connections完全不正确。
  7. 以下isConnectionPending()调用是多余的。当然它正在等待中,这就是为什么你得到了OP_CONNECT.
于 2014-09-10T01:45:08.310 回答