我将 Netty 用于多线程 TCP 服务器和单个客户端持久连接。客户端发送许多二进制消息(在我的用例中为 10000 条),并且应该接收每条消息的答案。我在管道中添加了一个 OrderedMemoryAwareThreadPoolExecutor 来处理在多个线程上执行 DB 调用。
如果我在 messageReceived() 方法中运行数据库调用(或使用 Thread.currentThread().sleep(50) 模拟它),那么所有事件都由单个线程处理。
5 count of {main}
1 count of {New
10000 count of {pool-3-thread-4}
对于 messageReceived() 的简单实现,服务器按预期创建了许多执行线程。
请问我应该如何配置 ExecutionHandler 以获取业务逻辑的多个线程执行器?
这是我的代码:
public class MyServer {
public void run() {
OrderedMemoryAwareThreadPoolExecutor eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 1048576L, 1048576L, 1000, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory());
ExecutionHandler executionHandler = new ExecutionHandler(eventExecutor);
bootstrap.setPipelineFactory(new ServerChannelPipelineFactory(executionHandler));
}
}
public class ServerChannelPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
pipeline.addLast("encoder", new MyProtocolEncoder());
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("executor", executionHandler);
pipeline.addLast("myHandler", new MyServerHandler(dataSource));
}
}
public class MyServerHandler extends SimpleChannelHandler {
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws DBException {
// long running DB call simulation
try {
Thread.currentThread().sleep(50);
} catch (InterruptedException ex) {
}
// a simple message
final MyMessage answerMsg = new MyMessage();
if (e.getChannel().isWritable()) {
e.getChannel().write(answerMsg);
}
}
}