4

我正在分析的代码使用 Netty NioDatagramChannelFactory 创建 UDP 服务器。它创建一个线程池:

ExecutorService threadPool = Executors.newCachedThreadPool();

然后是数据报通道、pipelineFactory & bootstrap:

int workerCount = 10;
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory();

ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory);
bootStrap.setPipelineFactory(pipelineFactory);
bootStrap.bind(new InetSocketAddress(host, port));

在 pipelineFactory 中,getPipeline() 添加了自定义处理程序。

就像在: UDP 消息的多线程处理中所说的那样

只有一个线程处理收到的消息。在日志中,线程名称显示为New I/O datagram worker #1,如:

2012-04-20 09:20:51,853 新的 I/O 数据报工作人员 #1'-'1 INFO [cemrshSNMPTrapsRequestHandler:42] messageReceived | 处理:V1TRAP[reqestID=0, ...]

我阅读了文档和这个条目:Lot of UDP requests lost in UDP server with Netty

然后我根据这些条目更改了一些代码。现在创建线程池:

int corePoolSize = 5;
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576);

以及带有 ExecutionHandler 的 pipelineFactory:

ExecutionHandler executionHandler = new ExecutionHandler(threadPool);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler);

并且 getPipeline() 添加了如下所述的处理程序:

public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory {

    private ExecutionHandler executionHandler = null;

    public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) { 
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addFirst("ExecutorHandler", executionHandler);

        // Here the custom handlers are added
        pipeline.addLast( ... )
    }

现在,我在日志中得到了 4 个不同的线程名称。它们显示为pool-2-thread-1pool-2-thread-2等...

例如:

2012-05-09 09:12:19,589 pool-2-thread-1 信息 [cemrshSNMPTrapsRequestHandler:46] messageReceived | 处理:V1TRAP[reqestID=0, ...]

但它们不是同时处理的。messageReceived() 下的处理必须在一个线程上完成,以便下一个线程处理下一条消息。我从不同的客户端向服务器发送了大量消息,并且我得到的日志不是交错的。我还尝试在 messageReceived() 中使用 Thread.sleep(),并确认了前面的内容。

我错过了什么吗?有没有办法用 Netty 实现真正的多线程 UDP 服务器?如何让不同的线程同时执行 messageReceived()?

4

2 回答 2

1

根据我的经验和对 UDP 的 Netty 的理解,只有一个线程处理 UDP 消息进行解码是正常的。由于 UDP 是无会话的,因此只有一个线程可以在一个 UDP 端口上接收数据并对其进行解码。

一旦您解码了数据并将其包装到缓冲区或特定的 java 对象中,您就可以将该对象放入将处理它的线程池(执行处理程序 -> 您的业务处理程序)。然后,一旦您将先前解码的数据释放到执行处理程序中,就可以解码 UDP 端口上即将出现的新数据。

您可以在创建 NioDatagramChannelFactory 时指定的池线程仅在您侦听多个端口上的数据时使用。每个端口只有一个线程有意义。即使您在该构造函数中指定了 100 个工作器,如果您配置了一个 UDP 端口,也只会使用一个。

于 2012-05-13T18:07:56.727 回答
0

让我大吃一惊的一件事是,您将执行处理程序放在管道中的首位。我相信其意图是直到“应用程序”处理程序的整个管道应该由执行 IO解码的 IO 线程执行。

因此,我会断言您首先要添加所有 SNMPTrap 解码处理程序,然后,当您有一个实际的 SNMPTrap 时,它会被移交给执行处理程序,该处理程序又将陷阱传递给陷阱的实际使用者做一些有用的事情。

@Override
public ChannelPipeline getPipeline() throws Exception {

    ChannelPipeline pipeline = Channels.pipeline(
         new SomethingSomethingDecoder(),
         new SNMPTrapDecoder(),
         executionHandler.
         snmpTrapConsumerHandler
    );
}

至少,ExecutionHandler javadoc中是这样显示的,以上是我对它的解释。

于 2012-05-09T21:05:23.037 回答