作为我研究的一部分,我正在用 Java 编写一个高负载 TCP/IP 回显服务器。我想为大约 3-4k 的客户提供服务,并查看每秒可以从中挤出的最大可能消息。消息大小非常小 - 最多 100 个字节。这项工作没有任何实际目的——只是一项研究。
根据我看过的大量演示文稿(HornetQ 基准测试、LMAX Disruptor 演讲等),现实世界的高负载系统往往每秒处理数百万个事务(我相信 Disruptor 提到了大约 6 mils 和 Hornet - 8.5)。例如,这篇文章指出可以实现高达 40M MPS。所以我把它作为对现代硬件应该具备的能力的粗略估计。
我写了最简单的单线程 NIO 服务器并启动了负载测试。我对在 localhost 上只能获得大约 100k MPS 而在实际网络上只能获得 25k 感到惊讶。数字看起来很小。我在 Win7 x64,核心 i7 上进行测试。查看 CPU 负载 - 只有一个内核处于忙碌状态(这在单线程应用程序中是预期的),而其余内核处于空闲状态。但是,即使我加载了所有 8 个内核(包括虚拟内核),我的 MPS 也不会超过 800k - 甚至不会接近 4000 万 :)
我的问题是:向客户提供大量消息的典型模式是什么?我是否应该在单个 JVM 内的多个不同套接字上分配网络负载,并使用诸如 HAProxy 之类的负载平衡器将负载分配到多个内核?或者我应该考虑在我的 NIO 代码中使用多个选择器?或者甚至可以在多个 JVM 之间分配负载并使用 Chronicle 在它们之间建立进程间通信?在像 CentOS 这样的适当服务器端操作系统上进行测试会产生很大的不同吗(也许是 Windows 让事情变慢了)?
下面是我的服务器的示例代码。对于任何传入的数据,它总是回答“ok”。我知道在现实世界中我需要跟踪消息的大小并准备好一条消息可能会在多个读取之间拆分,但是我现在想让事情变得超级简单。
public class EchoServer {
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
private InetAddress hostAddress = null;
private int port;
private Selector selector;
private long loopTime;
private long numMessages = 0;
public EchoServer() throws IOException {
this(DEFAULT_PORT);
}
public EchoServer(int port) throws IOException {
this.port = port;
selector = initSelector();
loop();
}
private void loop() {
while (true) {
try{
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("Client is connected");
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
key.cancel();
socketChannel.close();
System.out.println("Forceful shutdown");
return;
}
if (numRead == -1) {
System.out.println("Graceful shutdown");
key.channel().close();
key.cancel();
return;
}
socketChannel.register(selector, SelectionKey.OP_WRITE);
numMessages++;
if (numMessages%100000 == 0) {
long elapsed = System.currentTimeMillis() - loopTime;
loopTime = System.currentTimeMillis();
System.out.println(elapsed);
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));
socketChannel.write(dummyResponse);
if (dummyResponse.remaining() > 0) {
System.err.print("Filled UP");
}
key.interestOps(SelectionKey.OP_READ);
}
private Selector initSelector() throws IOException {
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
public static void main(String[] args) throws IOException {
System.out.println("Starting echo server");
new EchoServer();
}
}