0

我发现除了简单的情况外,NIO 的文档记录最多。即便如此,我已经完成了教程和几次重构,最终退回到最简单的情况,但我仍然偶尔会通过读取 0 字节的 SocketChannel 来触发 isReadable。并非每次执行都会发生。

我曾经在一个单独的线程中从附加对象调用读取,并认为这可能是竞争条件,但我已经开始在选择器的线程中进行读取,但问题仍然存在。我想它可能是我的测试客户端,但我不确定什么会不一致地触发它,因为客户端套接字在收到服务器的响应之前不应该关闭。

因此,在包含的代码中,此代码段发送的“hello”消息每次都能像我预期的那样正常

        out.write("hello".getBytes());
        out.write(EOT);
        out.flush();

在此之后,我偶尔会得到一个 0 长度的套接字通道。有时会从这个片段中得到正确的响应:

        out.write(dataServerCredentials.getBytes());
        out.write(EOT);
        out.flush();

对此的任何见解将不胜感激,它正在慢慢杀死我。我已经尝试在这里找到答案,但似乎相关的一个问题并没有真正说明我的问题。

提前致谢!

下面的代码片段:

选择方法:

public void execute()
{
    initializeServerSocket();

    for (;;)
    {
        try
        {
            System.out.println("Waiting for socket activity");

            selector.select();

            Iterator<SelectionKey> selectedKeys = 
                this.selector.selectedKeys().iterator();
            while(selectedKeys.hasNext())
            {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) 
                {
                    continue;
                }

                if (key.isAcceptable())
                {   // New connection
                    // TODO: Create helper method for this that configures user info?
                    System.out.println("Accepting connection");

                    ServerSocketChannel serverSocketChannel =
                        (ServerSocketChannel)key.channel();
                    SocketChannel socketChannel =
                        serverSocketChannel.accept();

                    socketChannel.socket().setSoTimeout(0);
                    socketChannel.configureBlocking(false);
                    SelectionKey newKey = 
                        socketChannel.register(selector, SelectionKey.OP_READ);

                    // Create and attach an AuthProcessor to drive the states of this
                    // new Authentication request
                    newKey.attach(new AuthenticationRequestProcessor(newKey));

                }
                else if (key.isReadable())
                {   // Socket has incoming communication
                    AuthenticationRequestProcessor authProcessor =
                        (AuthenticationRequestProcessor)key.attachment();

                    if (authProcessor == null)
                    {   // Cancel Key
                        key.channel().close();
                        key.cancel();
                        System.err.print("Cancelling Key - No Attachment");
                    }
                    else
                    {   
                        if (authProcessor.getState() ==
                            AuthenticationRequestProcessor.TERMINATE_STATE)
                        {   // Cancel Key
                            key.channel().close();
                            key.cancel();
                        }
                        else
                        {   // Process new socket data
                            authProcessor.process(readStringFromKey(key));
                        }
                    }
                }                    
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

读取方法(忽略这里的一些愚蠢,这是从另一个线程中拉出来的)

protected String readStringFromKey(SelectionKey key)
{
    SocketChannel socketChannel = (SocketChannel)key.channel();

    readBuffer.clear();

    String message = null;

    try
    {
        final int bytesRead = socketChannel.read(readBuffer);

        if (-1 == bytesRead)
        {   // Empty/Closed Channel
            System.err.println("Error - No bytes to read on selected channel");
        }
        else
        {   // Convert ByteBuffer into a String
            System.out.println("Bytes Read: " + bytesRead);
            readBuffer.flip();
            message = byteBufferToString(readBuffer, bytesRead);
            readBuffer.clear();
        }
    }
    catch (IOException e)
    {
        // TODO Auto-generated catch block

        e.printStackTrace();
    }

    // Trim EOT off the end of the message
    return message.trim();
}

客户端片段:

    public void connect()
{
    boolean connectionStatus = false;
    String connectionHost = null;
    int connectionPort = 0;
    String connectionAuthKey = null;

    try
    {   // Login
        authenticationSocket = new Socket(AUTH_HOST, AUTH_PORT);
        out = authenticationSocket.getOutputStream();
        in = new BufferedInputStream(authenticationSocket.getInputStream());

        out.write("hello".getBytes());
        out.write(EOT);
        out.flush();


        StringBuilder helloResponse = new StringBuilder();

        // Read response off socket
        int currentByte = in.read();

        while (currentByte > -1 && currentByte != EOT)
        {
            helloResponse.append((char)currentByte);
            currentByte = in.read();
        }

        outgoingResponses.offer(Plist.fromXml(helloResponse.toString()));
        System.out.println("\n" + helloResponse.toString());

        out.write(credentials.getBytes());
        out.write(EOT);
        out.flush();

        // Read status
        int byteRead;

        StringBuilder command = new StringBuilder();

        do 
        {
            byteRead = in.read();
            if (0 < byteRead) 
            {
                if (EOT == byteRead)
                {
                    Logger.logData(command.toString());

                    Map<String, Object> plist = Plist.fromXml(command.toString());
                    outgoingResponses.offer(plist);

                    // Connection info for Data Port
                    connectionStatus = (Boolean)plist.get(STATUS_KEY);
                    connectionHost = (String)plist.get(SERVER_KEY);
                    connectionPort = (Integer)plist.get(PORT_KEY);
                    connectionAuthKey = (String)plist.get(AUTH_KEY);

                    Logger.logData("Server =>" + plist.get("server"));

                    command = new StringBuilder();

                }
                else
                {
                    command.append((char)byteRead);
                }
            }
        } 
        while (EOT != byteRead);
    }
    catch (UnknownHostException e)
    {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    catch (IOException e)
    {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    catch (XmlParseException e)
    {
        Logger.logData("Invalid Plist format");
        e.printStackTrace();
    }
    finally
    {   // Clean up handles
        try
        {
            authenticationSocket.close();
            out.close();
            in.close();
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    System.out.println("Connection status =>" + connectionStatus);
    System.out.println("Connection host =>" + connectionHost);
    System.out.println("Connection port =>" + connectionPort);

    if (connectionStatus)
    {
        dataServerHost = connectionHost;
        dataServerPort = connectionPort;
        dataServerAuthKey = connectionAuthKey;
        System.out.println("Connecting to data server @: " + dataServerHost + ":" + dataServerPort);
        connectToDataServer();
    }
}
4

1 回答 1

2

我记得虚假选择器唤醒是可能的。

有趣的是,当您被告知有要读的东西时,没有什么可读的,但这对于程序来说通常不是问题。程序在读取 TCP 流时通常应该期望任意数量的字节;而0字节的情况通常不需要特殊处理。

你的程序理论上是错误的。您不能指望您可以一次阅读整封邮件。一次读取可能只返回其中的一部分。可能只有 1 个字节。没有保证。

“正义”的方式是将读取的所有字节累积在一个缓冲区中。在缓冲区中寻找 EOT。如果消息是分段的,则可能需要多次读取才能到达整条消息。

loop 
  select();
  if readable
     bytes = read()
     buffer.append(bytes)
     while( buffer has EOT at position i)
       msg = buffer[0-i]
       left shift buffer by i

你可以在这个流程中看到,read() 读取 0 字节并不重要。这真的与 NIO 无关。即使在传统的阻塞 TCP IO 中,这种策略也必须做到理论上正确。

但是,实际上,如果你确实观察到整个信息总是一体成型,你就不需要这么复杂了。您的原始代码在您的环境中实际上是正确的。

现在您观察到有时会读取 0 字节。那么你之前的实际假设必须被修改。您可以添加一些特殊的分支来忽略 0 字节块。

于 2011-06-08T02:27:22.707 回答