2

举个简单的例子,假设我想在 netty 中只使用 2 个工作线程来处理 3 个同时的 TCP 客户端连接,我该怎么做?

问题 A) 使用下面的代码,我的第三个连接没有从服务器获取任何数据 - 连接就在那里。注意 - 我的工作执行程序和工作人员计数如何为 2。所以如果我有 2 个工作线程和 3 个连接,是否所有三个连接都应该由 2 个线程提供服务?

B) 另一个问题是 - netty 是否使用 java.util.concurrent 的 CompletionService?它似乎没有使用它。另外,我没有看到任何执行 executor.submit 或 future.get 的源代码所以所有这些都增加了它如何处理和提供数据到比它的工作线程更多的连接的混乱?

C)我对netty如何处理10000多个同时TCP连接感到迷茫......它会创建10000个线程吗?每个连接的线程不是一个可扩展的解决方案,所以我很困惑,因为我的测试代码没有按预期工作。

    import java.net.InetSocketAddress;
    import java.nio.channels.ClosedChannelException;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.logging.Level;
    import java.util.logging.Logger;

    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelFuture;
    import org.jboss.netty.channel.ChannelFutureListener;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    import org.jboss.netty.handler.codec.string.StringEncoder;

    public class SRNGServer {

      public static void main(String[] args) throws Exception {
          // Configure the server.
          ServerBootstrap bootstrap = new ServerBootstrap(
                  new NioServerSocketChannelFactory(
                          Executors.newCachedThreadPool(),
                          //Executors.newCachedThreadPool()
                          Executors.newFixedThreadPool(2),2
                          ));

          // Configure the pipeline factory.
          bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP());

          // Bind and start to accept incoming connections.
          bootstrap.bind(new InetSocketAddress(8080));
      }



      private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler {

        private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName());


        @Override
        public void channelConnected(
                ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

          // Send greeting for a new connection.
          Channel ch=e.getChannel();

          System.out.printf("channelConnected with channel=[%s]%n", ch);

          ChannelFuture writeFuture=e.getChannel().write("It is " + new Date() + " now.\r\n");

          SRNGChannelFutureListener srngcfl=new SRNGChannelFutureListener();

          System.out.printf("Registered listener=[%s] for future=[%s]%n", srngcfl, writeFuture);

          writeFuture.addListener(srngcfl);      

        }

        @Override
        public void exceptionCaught(
                ChannelHandlerContext ctx, ExceptionEvent e) {

            logger.log(
                    Level.WARNING,
                    "Unexpected exception from downstream.",
                    e.getCause());
            if(e.getCause() instanceof ClosedChannelException){
              logger.log(Level.INFO, "****** Connection closed by client - Closing Channel");
            }
            e.getChannel().close();
        }
      }



      private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory {

        public ChannelPipeline getPipeline() throws Exception {

            // Create a default pipeline implementation.
            ChannelPipeline pipeline = Channels.pipeline();

            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new SRNGServerHandlerP());

            return pipeline;
        }
      }


      private static class SRNGChannelFutureListener implements ChannelFutureListener{

        public void operationComplete(ChannelFuture future) throws InterruptedException{
          Thread.sleep(1000*5);
          Channel ch=future.getChannel();
          if(ch!=null && ch.isConnected()){
              ChannelFuture writeFuture=ch.write("It is " + new Date() + " now.\r\n");
              //-- Add this instance as listener itself.
              writeFuture.addListener(this);
          }

        }

      }
    }
4

1 回答 1

4

我没有详细分析你的源代码,所以我不知道它为什么不能正常工作。但是这条线SRNGChannelFutureListener看起来很可疑:

Thread.sleep(1000*5);

这将使执行它的线程被锁定 5 秒;在此期间,该线程将无法进行任何其他处理。

问题 C:不,它不会创建 10,000 个线程;Netty 的全部意义在于它不会那样做,因为那样确实不能很好地扩展。相反,它使用线程池中有限数量的线程,每当发生某些事情时生成事件,并在池中的线​​程上运行事件处理程序。因此,线程和连接彼此分离(每个连接都没有线程)。

为了使这种机制正常工作,您的事件处理程序应尽快返回,以使它们运行的​​线程可用于尽快运行下一个事件处理程序。如果您让线程休眠 5 秒,那么您将保持线程的分配状态,因此它不能用于处理其他事件。

问题 B:如果你真的想知道,你可以将源代码拿到 Netty 并找出答案。它使用选择器和其他java.nio类来执行异步 I/O

于 2011-07-23T06:44:17.860 回答