21

作为我研究的一部分,我正在用 Java 编写一个高负载 TCP/IP 回显服务器。我想为大约 3-4k 的客户提供服务,并查看每秒可以从中挤出的最大可能消息。消息大小非常小 - 最多 100 个字节。这项工作没有任何实际目的——只是一项研究。

根据我看过的大量演示文稿(HornetQ 基准测试、LMAX Disruptor 演讲等),现实世界的高负载系统往往每秒处理数百万个事务(我相信 Disruptor 提到了大约 6 mils 和 Hornet - 8.5)。例如,这篇文章指出可以实现高达 40M MPS。所以我把它作为对现代硬件应该具备的能力的粗略估计。

我写了最简单的单线程 NIO 服务器并启动了负载测试。我对在 localhost 上只能获得大约 100k MPS 而在实际网络上只能获得 25k 感到惊讶。数字看起来很小。我在 Win7 x64,核心 i7 上进行测试。查看 CPU 负载 - 只有一个内核处于忙碌状态(这在单线程应用程序中是预期的),而其余内核处于空闲状态。但是,即使我加载了所有 8 个内核(包括虚拟内核),我的 MPS 也不会超过 800k - 甚至不会接近 4000 万 :)

我的问题是:向客户提供大量消息的典型模式是什么?我是否应该在单个 JVM 内的多个不同套接字上分配网络负载,并使用诸如 HAProxy 之类的负载平衡器将负载分配到多个内核?或者我应该考虑在我的 NIO 代码中使用多个选择器?或者甚至可以在多个 JVM 之间分配负载并使用 Chronicle 在它们之间建立进程间通信?在像 CentOS 这样的适当服务器端操作系统上进行测试会产生很大的不同吗(也许是 Windows 让事情变慢了)?

下面是我的服务器的示例代码。对于任何传入的数据,它总是回答“ok”。我知道在现实世界中我需要跟踪消息的大小并准备好一条消息可能会在多个读取之间拆分,但是我现在想让事情变得超级简单。

public class EchoServer {

private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

private InetAddress hostAddress = null;

private int port;
private Selector selector;

private long loopTime;
private long numMessages = 0;

public EchoServer() throws IOException {
    this(DEFAULT_PORT);
}

public EchoServer(int port) throws IOException {
    this.port = port;
    selector = initSelector();
    loop();
}

private void loop() {
    while (true) {
        try{
            selector.select();
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isWritable()) {
                    write(key);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

private void accept(SelectionKey key) throws IOException {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    SocketChannel socketChannel = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
    socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
    socketChannel.register(selector, SelectionKey.OP_READ);

    System.out.println("Client is connected");
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(readBuffer);
    } catch (IOException e) {
        key.cancel();
        socketChannel.close();

        System.out.println("Forceful shutdown");
        return;
    }

    if (numRead == -1) {
        System.out.println("Graceful shutdown");
        key.channel().close();
        key.cancel();

        return;
    }

    socketChannel.register(selector, SelectionKey.OP_WRITE);

    numMessages++;
    if (numMessages%100000 == 0) {
        long elapsed = System.currentTimeMillis() - loopTime;
        loopTime = System.currentTimeMillis();
        System.out.println(elapsed);
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));

    socketChannel.write(dummyResponse);
    if (dummyResponse.remaining() > 0) {
        System.err.print("Filled UP");
    }

    key.interestOps(SelectionKey.OP_READ);
}

private Selector initSelector() throws IOException {
    Selector socketSelector = SelectorProvider.provider().openSelector();

    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
    serverChannel.socket().bind(isa);
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
    return socketSelector;
}

public static void main(String[] args) throws IOException {
    System.out.println("Starting echo server");
    new EchoServer();
}
}
4

3 回答 3

24
what is a typical pattern for serving massive amounts of messages to clients?

有许多可能的模式:在不经过多个 jvm 的情况下利用所有内核的简单方法是:

  1. 让一个线程接受连接并使用选择器读取。
  2. 一旦你有足够的字节来构成一条消息,就可以使用像环形缓冲区这样的结构将它传递给另一个核心。Disruptor Java 框架非常适合这一点。如果需要知道什么是完整消息的处理是轻量级的,那么这是一个很好的模式。例如,如果您有一个以长度为前缀的协议,您可以等到获得预期的字节数,然后将其发送到另一个线程。如果协议的解析非常繁重,那么您可能会压倒这个单线程,阻止它接受连接或读取网络字节。
  3. 在从环形缓冲区接收数据的工作线程上,进行实际处理。
  4. 您可以在工作线程上或通过其他聚合器线程写出响应。

这就是它的要点。这里有更多可能性,答案实际上取决于您正在编写的应用程序的类型。几个例子是:

  1. 一个 CPU 繁重的无状态应用程序说一个图像处理应用程序。每个请求完成的 CPU/GPU 工作量可能会大大高于由非常幼稚的线程间通信解决方案产生的开销。在这种情况下,一个简单的解决方案是一堆工作线程从单个队列中提取工作。请注意,这是一个单个队列,而不是每个工作人员一个队列。优点是这本质上是负载平衡的。每个工人完成它的工作,然后轮询单生产者多消费者队列。即使这是争用的根源,图像处理工作(秒?)应该比任何同步替代方案都要昂贵得多。
  2. 一个纯粹的 IO 应用程序,例如一个统计服务器,它只是为请求增加一些计数器:在这里你几乎不需要 CPU 繁重的工作。大部分工作只是读取字节和写入字节。多线程应用程序可能不会给您带来显着的好处。事实上,如果排队项目所需的时间超过处理它们所需的时间,它甚至可能会减慢速度。单线程 Java 服务器应该能够轻松地使 1G 链路饱和。
  3. 需要适度处理的有状态应用程序,例如典型的业务应用程序:这里每个客户端都有一些状态来确定如何处理每个请求。假设我们使用多线程,因为处理很重要,我们可以将客户端关联到某些线程。这是演员架构的变体:

    i)当客户端第一次将哈希连接到工作人员时。您可能希望使用一些客户端 ID 来执行此操作,这样如果它断开连接并重新连接,它仍会分配给同一个工作人员/参与者。

    ii) 当阅读器线程读取完整的请求时,将其放在正确的工作人员/参与者的环形缓冲区中。由于同一个工作人员总是处理特定的客户端,因此所有状态都应该是线程本地的,从而使所有处理逻辑变得简单且单线程。

    iii) 工作线程可以写出请求。总是尝试只做一个 write()。如果您的所有数据都无法写出,那么您是否注册 OP_WRITE。如果确实有一些未完成的事情,工作线程只需要进行选择调用。大多数写入应该会成功,从而使这变得不必要。这里的技巧是在选择调用和轮询环形缓冲区以获取更多请求之间取得平衡。您还可以使用单个编写器线程,其唯一职责是写出请求。每个工作线程都可以将它的响应放在将它连接到这个单个写入器线程的环形缓冲区上。单写线程循环轮询每个传入的环形缓冲区并将数据写出到客户端。

正如您所指出的,还有许多其他选择:

Should I distribute networking load over several different sockets inside a single JVM and use some sort of load balancer like HAProxy to distribute load to multiple cores?

您可以这样做,但恕我直言,这不是负载均衡器的最佳用途。这确实为您购买了独立的 JVM,它们可能会自行失败,但可能比编写多线程的单个 JVM 应用程序要慢。应用程序本身可能更容易编写,因为它将是单线程的。

Or I should look towards using multiple Selectors in my NIO code?

你也可以这样做。查看 Ngnix 架构以获取有关如何执行此操作的一些提示。

Or maybe even distribute the load between multiple JVMs and use Chronicle to build an inter-process communication between them? 这也是一种选择。Chronicle 为您提供了一个优势,即内存映射文件对中间退出的进程更具弹性。由于所有通信都是通过共享内存完成的,因此您仍然可以获得足够的性能。

Will testing on a proper serverside OS like CentOS make a big difference (maybe it is Windows that slows things down)?

我不知道这件事。不太可能。如果 Java 最充分地使用原生 Windows API,那么它应该没那么重要。我非常怀疑每秒 4000 万次交易的数字(没有用户空间网络堆栈 + UDP),但我列出的架构应该做得很好。

这些架构往往表现良好,因为它们是使用基于有界数组的数据结构进行线程间通信的单写入器架构。确定多线程是否甚至是答案。在许多情况下,它是不需要的,并且可能导致减速。

另一个需要研究的领域是内存分配方案。具体来说,分配和重用缓冲区的策略可能会带来显着的好处。正确的缓冲区重用策略取决于应用程序。看看伙伴内存分配、竞技场分配等方案,看看它们是否能让你受益。JVM GC 对于大多数工作负载都做得很好,所以在你走这条路之前总是要测量。

协议设计对性能也有很大影响。我倾向于使用长度前缀协议,因为它们允许您分配正确大小的缓冲区,避免缓冲区列表和/或缓冲区合并。以长度为前缀的协议还使决定何时移交请求变得容易——只需检查一下num bytes == expected。实际的解析可以由工作线程完成。序列化和反序列化超出了以长度为前缀的协议。像在缓冲区而不是分配上的享元模式这样的模式在这里有所帮助。查看SBE了解其中的一些原则。

正如你可以想象的那样,一整篇论文都可以写在这里。这应该使您朝着正确的方向前进。警告:始终测量并确保您需要比最简单的选项更高的性能。很容易陷入性能改进的永无止境的黑洞。

于 2014-01-08T04:17:57.390 回答
6

你关于写作的逻辑是错误的。您应该立即尝试写入您有数据要写入。如果write()返回零,则注册 OP_WRITE,当通道变为可写时重试写入,并OP_WRITE在写入成功时取消注册。你在这里增加了大量的延迟。OP_READ在执行所有这些操作时取消注册会增加更多延迟。

于 2014-01-07T11:56:45.633 回答
2

使用常规硬件,您将获得每秒数十万个请求。至少这是我尝试构建类似解决方案的经验,Tech Empower Web Frameworks Benchmark似乎也同意这一点。

通常,最好的方法取决于您是否有 io-bound 或 cpu-bound 负载。

对于 io-bound 负载(高延迟),您需要使用许多线程执行异步 io。为了获得最佳性能,您应该尽量避免线程之间的切换。因此,拥有一个专用的选择器线程和另一个线程池进行处理比拥有一个每个线程都进行选择或处理的线程池要慢,因此在最好的情况下(如果 io 立即可用),请求由单个线程处理。这种类型的设置代码更复杂但速度更快,我不相信任何异步 Web 框架都充分利用了这一点。

对于 cpu-bound 负载,每个请求一个线程通常是最快的,因为您避免了上下文切换。

于 2014-01-08T06:32:46.750 回答