我是 Netty 的新手,对我来说它似乎很重量级,所以我决定对其性能进行一些研究。我试图实现一个执行以下操作的服务器:
- 从某些来源(许多 UDP 端口)接收 UDP 数据报。
- 监听 TCP 连接。
- 将接收到的数据报发送到连接的 TCP 客户端。
客户端的数量非常少,我在测试中使用了 10 个,但 UDP 流非常繁重 - 50 个流,每个流大约 200 kbyte/sec,这使得它大约 10 MB/sec。我用一个单线程应用程序模拟了这些流,每个应用程序发送 200 个 1440 字节的数据包(每个端口 4 个数据包),然后休眠 28 毫秒等等(这真的给了我大约 8000 kB/s,我猜是因为高负载和不准确的睡眠时间)。
现在,我发现这不是一个非常高的负载,但我的 PC 也很慢 - 一些旧的 2 核 Intel E4600。板载 Windows 7 x64。
我启动了三个程序:发送者(模仿者)、服务器和客户端。都在同一台机器上,我想这不是测试它的最佳方法,但至少它应该允许我比较服务器的不同实现如何与相同的模仿者和客户端一起工作。
数据包结构如下所示:8 字节时间戳、8 字节数据包编号(以 0 开头)、1 字节端口标识符和 1 字节“子流”标识符。这个想法是 50 个端口中的每一个都有 4 个子流,所以我实际上有 200 个独立的数据包流,分组为 50 个 UDP 流。
结果有些出人意料。使用普通的每个客户端线程服务器,我获得了大约 7500 kB/s 的吞吐量,并且丢包率非常低。实际上每个客户端有两个线程(另一个在 read() 上被阻塞,以防客户端发送某些东西,但它没有发送)和 50 个用于 UDP 接收的线程。CPU 负载约为 60%。
使用 OIO Netty 服务器,我在客户端获得大约 6000 kB/s 的数据包丢失。低水位线设置为 50 MB,高水位线设置为 100 MB!CPU 负载为 80%,这也不是一个好兆头。
使用 NIO Netty 服务器,我得到大约 4500 kB/s,但由于某些莫名其妙的原因没有损失。也许它减慢了我的发件人进程?但这没有任何意义:CPU 负载约为 60%,而且 NIO 不应该使用大量可能阻碍发送方调度的线程......
这是我的 Netty 服务器实现:
public class NettyServer {
public static void main(String[] args) throws Exception {
new NettyServer(Integer.parseInt(args[0])).run();
}
private final int serverPort;
private NettyServer(int serverPort) {
this.serverPort = serverPort;
}
private void run() throws InterruptedException {
boolean nio = false;
EventLoopGroup bossGroup;
EventLoopGroup workerGroup;
EventLoopGroup receiverGroup;
if (nio) {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
receiverGroup = new NioEventLoopGroup();
} else {
bossGroup = new OioEventLoopGroup();
workerGroup = new OioEventLoopGroup();
receiverGroup = new OioEventLoopGroup();
}
final List<ClientHandler> clients
= Collections.synchronizedList(new LinkedList<ClientHandler>());
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup).channel(
nio ? NioServerSocketChannel.class : OioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.config().setWriteBufferHighWaterMark(1024 * 1024 * 100);
ch.config().setWriteBufferLowWaterMark(1024 * 1024 * 50);
final ClientHandler client = new ClientHandler(clients);
ch.pipeline().addLast(client);
}
});
server.bind(serverPort).sync();
Bootstrap receiver = new Bootstrap();
receiver.group(receiverGroup);
receiver.channel(nio ? NioDatagramChannel.class : OioDatagramChannel.class);
for (int port = 18000; port < 18000 + 50; ++port) {
receiver.handler(new UDPHandler(clients));
receiver.bind(port).sync();
}
}
}
class UDPHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private final Collection<ClientHandler> clients;
private static final long start = System.currentTimeMillis();
private static long sum = 0;
private static long count = 0;
private final Long[][] lastNum = new Long[50][4];
public UDPHandler(Collection<ClientHandler> clients){
this.clients = clients;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
final ByteBuf content = msg.content();
final int length = content.readableBytes();
synchronized (UDPHandler.class) {
sum += length;
if (++count % 10000 == 0) {
final long now = System.currentTimeMillis();
System.err.println((sum / (now - start)) + " kB/s");
}
}
long num = content.getLong(8);
// this basically identifies the sender port
// (0-50 represents ports 18000-18050)
int nip = content.getByte(16) & 0xFF;
// and this is "substream" within one port (0-3)
int stream = content.getByte(17) & 0xFF;
// the last received number for this nip/stream combo
Long last = lastNum[nip][stream];
if (last != null && num - last != 1) {
// number isn't incremented by 1, so there's packet loss
System.err.println("lost " + (num - last - 1));
}
lastNum[nip][stream] = num;
synchronized (clients) {
for (ClientHandler client : clients) {
final ByteBuf copy = content.copy();
client.send(copy);
}
}
}
}
public class ClientHandler extends ChannelInboundHandlerAdapter {
private final static Logger logger
= Logger.getLogger(ClientHandler.class.getName());
private ByteBuf buffer;
private final Collection<ClientHandler> clients;
private Channel channel;
ClientHandler(Collection<ClientHandler> clients) {
this.clients = clients;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
clients.add(this);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
clients.remove(this);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (!(cause instanceof IOException)) {
logger.log(Level.SEVERE, "A terrible thing", cause);
}
}
void send(ByteBuf msg) {
if (channel.isWritable()) {
channel.writeAndFlush(msg);
} else {
msg.release();
}
}
}
分析显示,我的简单服务器实现大约 83% 用于阻塞 UDP 读取,12% 用于等待锁(如果这就是 sun.misc.Unsafe.park() 所做的),大约 4.5% 用于阻塞 TCP 写入。
OIO 服务器大约 75% 用于阻止 UDP 读取,11% 用于阻止 TCP 读取(为什么?),6% 用于我的 UDP 处理程序(为什么这么多?)和 4% 用于阻止 TCP 写入。
NIO服务器在选择上花费了97.5%,这应该是一个好兆头。没有损失也是一个好兆头,并且 CPU 负载与我的普通服务器相同,看起来一切都很好,只要吞吐量不慢 2 倍!
所以这是我的问题:
- Netty 对这样的任务有效还是只对大量连接/请求有用?
- 为什么 OIO 实现会吃掉这么多 CPU 并丢包?与普通的每个客户端 2 线程有什么不同?我怀疑这仅仅是因为管道等实用数据结构造成的一些开销。
- 当我切换到 NIO 时会发生什么?如何减慢速度但不丢失任何数据包?我肯定会认为我的代码有问题,但如果我只切换到 OIO 而不修改任何内容,它似乎可以获得所有 8000 kB/s 的流量。那么我的代码中是否存在仅发生在 NIO 上的错误?