3

好吧,我要疯了。我一直在为我的服务器重写 NIO 代码,并且遇到了一些真正令人头疼的问题。底线是让 NIO “正确”是非常困难的。有人向我指出http://rox-xmlrpc.sourceforge.net/niotut/上的 Rox 教程,这似乎走上了一条不错的道路,但并不像我想要的那样完整。例如,我需要知道如何在发送了排队的传出 ByteBuffers 之后才关闭服务器端的连接。SocketChannel.close() 很突然,如果过早完成可能会丢失数据。我还需要发送大于读取的 ByteBuffer 的大数据包。Rox 代码(也不是我看过的任何其他代码)处理这个问题。还有很多地方似乎未正确处理未捕获的异常。在我的测试中存在一些错误,鉴于 NIO 的复杂性,目前尚不清楚如何正确处理它们。

无论如何,当我试图解决这些问题时,会出现更多棘手的细节,而且变得相当复杂。所以我正在考虑一种完全不同的方法。很多人都在说 NIO 非常容易出错,而且不必要地令人困惑和复杂。他们提倡使用“每个连接线程”模型,该模型使用阻塞 IO,其中每个套接字连接都在其自己的线程上运行。这似乎是一个好主意,并且可以通过为所有连接(如在 NIO 中)设置一个选择器线程来减少前端的瓶颈,但代价是更高的开销(对于线程)。这种观点在http://paultyma.blogspot.com/2008/03/writing-java-multithreaded-servers.htmlhttp://mailinator.blogspot.com/2008/02/kill-myth-等帖子中得到了回应请-nio-is-not-faster-than.html

与 NIO 相比,代码应该很简单,但我真的想看一些示例代码。我似乎找不到任何东西。问题是我不认为这种“每个连接的线程阻塞 I/O”策略有一个更好的名字,我实际上可以得到很好的谷歌结果。任何人都可以将我链接到一些教程或简单示例来解释使用这种“旧”的 I/O 方法并使用线程池对其进行扩展吗?或者有什么其他的智慧之言?非常感谢!

4

3 回答 3

2

如果您正在使用 NIO,我还建议您使用框架。我一直在使用Apache Mina,我会推荐它。

至于阻塞 IO,本质上您将需要一个侦听器线程来接受传入连接并生成处理每个连接的附加线程。这是一个这样的监听器代码示例,最初贡献给 Apache Felix 项目。如果您要查找完整但经过修改的版本,可以在此处浏览源代码

例如

    /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements.  See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License.  You may obtain a copy of the License at
    *
    *      http://www.apache.org/licenses/LICENSE-2.0
    *
        * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package org.apache.felix.shell.remote;
    
    
    import java.io.IOException;
    import java.io.PrintStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketException;
    
    
    /**
     * Implements a simple listener that will accept a single connection.
     * <p/>
     *
     * @author Dieter Wimberger (wimpi)
     */
    class Listener
    {
    
        private int m_Port;
        private Thread m_ListenerThread;
        private boolean m_Stop = false;
        private ServerSocket m_ServerSocket;
        private AtomicInteger m_UseCounter;
        private int m_MaxConnections;
    
    
        /**
         * Activates this listener on a listener thread (telnetconsole.Listener).
         */
        public void activate()
        {
            //configure from system property
            try
            {
                m_Port = Integer.parseInt( System.getProperty( "osgi.shell.telnet.port", "6666" ) );
            }
            catch ( NumberFormatException ex )
            {
                Activator.getServices().error( "Listener::activate()", ex );
            }
            try
            {
                m_MaxConnections = Integer.parseInt( System.getProperty( "osgi.shell.telnet.maxconn", "2" ) );
            }
            catch ( NumberFormatException ex )
            {
                Activator.getServices().error( "Listener::activate()", ex );
            }
            m_UseCounter = new AtomicInteger( 0 );
            m_ListenerThread = new Thread( new Acceptor(), "telnetconsole.Listener" );
            m_ListenerThread.start();
        }//activate
    
    
        /**
         * Deactivates this listener.
         * <p/>
         * The listener's socket will be closed, which should cause an interrupt in the
         * listener thread and allow for it to return. The calling thread joins the listener
         * thread until it returns (to ensure a clean stop).
         */
        public void deactivate()
        {
            try
            {
                m_Stop = true;
                //wait for the listener thread
                m_ServerSocket.close();
                m_ListenerThread.join();
            }
            catch ( Exception ex )
            {
                Activator.getServices().error( "Listener::deactivate()", ex );
            }
        }//deactivate
    
        /**
         * Class that implements the listener's accept logic as a <tt>Runnable</tt>.
         */
        private class Acceptor implements Runnable
        {
    
            /**
             * Listens constantly to a server socket and handles incoming connections.
             * One connection will be accepted and routed into the shell, all others will
             * be notified and closed.
             * <p/>
             * The mechanism that should allow the thread to unblock from the ServerSocket.accept() call
             * is currently closing the ServerSocket from another thread. When the stop flag is set,
             * this should cause the thread to return and stop.
             */
            public void run()
            {
                try
                {
                    /*
                        A server socket is opened with a connectivity queue of a size specified
                        in int floodProtection.  Concurrent login handling under normal circumstances
                        should be handled properly, but denial of service attacks via massive parallel
                        program logins should be prevented with this.
                    */
                    m_ServerSocket = new ServerSocket( m_Port, 1 );
                    do
                    {
                        try
                        {
                            Socket s = m_ServerSocket.accept();
                            if ( m_UseCounter.get() >= m_MaxConnections )
                            {
                                //reject with message
                                PrintStream out = new PrintStream( s.getOutputStream() );
                                out.print( INUSE_MESSAGE );
                                out.flush();
                                //close
                                out.close();
                                s.close();
                            }
                            else
                            {
                                m_UseCounter.increment();
                                //run on the connection thread
                                Thread connectionThread = new Thread( new Shell( s, m_UseCounter ) );
                                connectionThread.start();
                            }
                        }
                        catch ( SocketException ex )
                        {
                        }
                    }
                    while ( !m_Stop );
    
                }
                catch ( IOException e )
                {
                    Activator.getServices().error( "Listener.Acceptor::activate()", e );
                }
            }//run
    
        }//inner class Acceptor
    
        private static final String INUSE_MESSAGE = "Connection refused.\r\n"
            + "All possible connections are currently being used.\r\n";
    
    }//class Listener

您可以在此处此处找到其他示例。

请注意,当您有更多负载时,NIO 相对于阻塞模型的优势就会发挥作用。从某一点开始,线程创建、管理和上下文切换的额外工作量将限制您的系统性能。

于 2010-01-28T15:42:49.903 回答
0

我建议您查看 JDK 中的 sample/nio 目录。这有许多简单的例子,包括你提到的两个。

于 2009-08-01T06:30:20.347 回答
0

您可能还想考虑使用更高级的框架,例如Grizzly,而不是直接使用 NIO。该框架应该允许您专注于您的用例,而不是 NIO 的微妙之处。

于 2009-08-05T10:46:18.927 回答