我正在分析的代码使用 Netty NioDatagramChannelFactory 创建 UDP 服务器。它创建一个线程池:
ExecutorService threadPool = Executors.newCachedThreadPool();
然后是数据报通道、pipelineFactory & bootstrap:
int workerCount = 10;
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory();
ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory);
bootStrap.setPipelineFactory(pipelineFactory);
bootStrap.bind(new InetSocketAddress(host, port));
在 pipelineFactory 中,getPipeline() 添加了自定义处理程序。
就像在: UDP 消息的多线程处理中所说的那样
只有一个线程处理收到的消息。在日志中,线程名称显示为New I/O datagram worker #1,如:
2012-04-20 09:20:51,853 新的 I/O 数据报工作人员 #1'-'1 INFO [cemrshSNMPTrapsRequestHandler:42] messageReceived | 处理:V1TRAP[reqestID=0, ...]
我阅读了文档和这个条目:Lot of UDP requests lost in UDP server with Netty
然后我根据这些条目更改了一些代码。现在创建线程池:
int corePoolSize = 5;
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576);
以及带有 ExecutionHandler 的 pipelineFactory:
ExecutionHandler executionHandler = new ExecutionHandler(threadPool);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler);
并且 getPipeline() 添加了如下所述的处理程序:
public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory {
private ExecutionHandler executionHandler = null;
public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) {
this.executionHandler = executionHandler;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addFirst("ExecutorHandler", executionHandler);
// Here the custom handlers are added
pipeline.addLast( ... )
}
现在,我在日志中得到了 4 个不同的线程名称。它们显示为pool-2-thread-1、pool-2-thread-2等...
例如:
2012-05-09 09:12:19,589 pool-2-thread-1 信息 [cemrshSNMPTrapsRequestHandler:46] messageReceived | 处理:V1TRAP[reqestID=0, ...]
但它们不是同时处理的。messageReceived() 下的处理必须在一个线程上完成,以便下一个线程处理下一条消息。我从不同的客户端向服务器发送了大量消息,并且我得到的日志不是交错的。我还尝试在 messageReceived() 中使用 Thread.sleep(),并确认了前面的内容。
我错过了什么吗?有没有办法用 Netty 实现真正的多线程 UDP 服务器?如何让不同的线程同时执行 messageReceived()?