0

我将 Netty 用于多线程 TCP 服务器和单个客户端持久连接。客户端发送许多二进制消息(在我的用例中为 10000 条),并且应该接收每条消息的答案。我在管道中添加了一个 OrderedMemoryAwareThreadPoolExecutor 来处理在多个线程上执行 DB 调用。

如果我在 messageReceived() 方法中运行数据库调用(或使用 Thread.currentThread().sleep(50) 模拟它),那么所有事件都由单个线程处理。

    5 count of {main}
    1 count of {New
10000 count of {pool-3-thread-4}

对于 messageReceived() 的简单实现,服务器按预期创建了许多执行线程。

请问我应该如何配置 ExecutionHandler 以获取业务逻辑的多个线程执行器?

这是我的代码:

public class MyServer {

      public void run() {
            OrderedMemoryAwareThreadPoolExecutor eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 1048576L, 1048576L, 1000, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory());  
            ExecutionHandler executionHandler = new ExecutionHandler(eventExecutor);        
            bootstrap.setPipelineFactory(new ServerChannelPipelineFactory(executionHandler));
      }
    }  



    public class ServerChannelPipelineFactory implements ChannelPipelineFactory {

      public ChannelPipeline getPipeline() throws Exception {

        pipeline.addLast("encoder", new MyProtocolEncoder());
        pipeline.addLast("decoder", new MyProtocolDecoder());
        pipeline.addLast("executor", executionHandler);
        pipeline.addLast("myHandler", new MyServerHandler(dataSource));

      }
    }

    public class MyServerHandler extends SimpleChannelHandler {

      public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws DBException {


          // long running DB call simulation
          try {
            Thread.currentThread().sleep(50);
          } catch (InterruptedException ex) {

          }  

          // a simple message  
          final MyMessage answerMsg = new MyMessage();
          if (e.getChannel().isWritable()) {
            e.getChannel().write(answerMsg);
          }  
      }      
    }
4

1 回答 1

3

OrderedMemoryAwareThreadPoolExecutor 保证来自单个通道的事件按顺序处理。您可以将其视为将通道绑定到池中的特定线程,然后处理该线程上的所有事件 - 尽管它比这更复杂一些,因此不要依赖始终由同一线程处理的通道。

如果您启动第二个客户端,您将看到它(很可能)在池中的另一个线程上进行处理。如果您真的可以并行处理单个客户端的请求,那么您可能需要 MemoryAwareThreadPoolExecutor 但请注意,这不能保证通道事件的顺序。

于 2013-02-08T14:48:27.587 回答