5

我一直在为多媒体消息构建一个高吞吐量的服务器应用程序,实现语言是C++。每台服务器可以独立使用,也可以将多台服务器连接在一起,创建一个基于 DHT 的覆盖网络;服务器就像 Skype 的情况一样,就像超级对等点一样。

工作正在进行中。目前,服务器每秒可以处理大约 200,000 条消息(256 字节消息),并且在我的机器(Intel i3 Mobile 2 GHz、Fedora Core 18(64 位)、4 GB RAM)上的最大吞吐量约为 256 MB/s长度为 4096 字节的消息。服务器有两个线程,一个用于处理所有 IO(基于 epoll,边缘触发),另一个用于处理传入消息。覆盖管理还有另一个线程,但在当前讨论中并不重要。

讨论中的两个线程使用两个循环缓冲区共享数据。线程 1 使用一个循环缓冲区为线程 2 排队新消息,而线程 2 通过另一个循环缓冲区返回处理后的消息。服务器完全无锁。我没有使用任何同步原语,甚至没有使用原子操作。

循环缓冲区永远不会溢出,因为消息是池化的(在开始时预先分配)。事实上,所有重要/经常使用的数据结构都被池化以减少内存碎片并提高缓存效率,因此我们知道在服务器启动时我们将要创建的最大消息数,因此我们可以预先计算最大值缓冲区的大小,然后相应地初始化循环缓冲区。

现在我的问题:线程 #1 一次将序列化消息排入队列(实际上是指向消息对象的指针),而线程 #2 以块的形式从队列中取出消息(32/64/128 的块),然后返回通过第二个循环缓冲区以块的形式处理消息。如果没有新消息,线程 #2 会一直忙于等待,从而使 CPU 内核之一一直处于忙碌状态。如何进一步改进设计?忙碌等待策略的替代方案是什么?我想优雅而高效地做到这一点。我考虑过使用信号量,但我担心这不是最好的解决方案,原因很简单,每次我在线程 #1 中排队一条消息时都必须调用“sem_post”,这可能会大大降低吞吐量,第二个线程必须调用“sem_post”等于防止信号量溢出的次数。另外我担心信号量实现可能在内部使用互斥锁。

第二个不错的选择可能是使用信号,如果我可以发现仅当第二个线程“清空队列并且正在调用 sigwait”或“已经在 sigwait 上等待”时才发出信号的算法,简而言之就是信号必须至少提高几次,尽管如果信号的提高次数比需要的多几次也不会受到伤害。是的,我确实使用了 Google 搜索,但我在 Internet 上找到的解决方案都不令人满意。以下是一些注意事项:

A. 服务器在进行系统调用时必须浪费最少的 CPU 周期,并且必须使用最少的系统调用次数。

B. 必须有非常低的开销并且算法必须是高效的。

C. 没有任何锁定。

我希望所有选项都摆在桌面上。

这是我共享服务器信息的网站的链接,以便更好地了解其功能和目的:www.wanhive.com

4

3 回答 3

2

如果您需要尽快唤醒线程#2,那么忙等待是好的。事实上,这是通知一个处理器有关另一个处理器所做更改的最快方式。您需要在两端生成内存栅栏(一侧写栅栏,另一侧读栅栏)。但是只有当您的两个线程都在专用处理器上执行时,此语句才成立。在这种情况下,不需要上下文切换,只需要缓存一致性流量。

可以进行一些改进。

  1. 如果线程 #2 通常受 CPU 限制并且忙于等待 - 它可能会受到调度程序的惩罚(至少在 Windows 和 linux 上)。操作系统调度程序动态调整线程优先级以提高整体系统性能。它降低了消耗大量 CPU 时间的 CPU 绑定线程的优先级,以防止线程饥饿。您需要手动增加线程#2 的优先级以防止这种情况发生。
  2. 如果您有多核或多处理器机器,您最终会遇到处理器订阅不足的情况,您的应用程序将无法利用硬件并发性。您可以通过使用多个处理器线程(线程 #2)来缓解这种情况。

处理步骤的并行化。 有两种选择。

  1. 您的消息是完全有序的,需要按照它们到达时的相同顺序进行处理。
  2. 消息可以重新排序。可以按任何顺序进行处理。

在第一种情况下,您需要 N 个循环缓冲区和 N 个处理线程以及 N 个输出缓冲区和一个使用者线程。线程 #1 在该循环缓冲区中以循环顺序将消息排入队列。

// Thread #1 pseudocode
auto message = recv()
auto buffer_index = atomic_increment(&message_counter);
buffer_index = buffer_index % N;  // N is the number of threads
// buffers is an array of cyclic buffers - Buffer* buffers[N];
Buffer* current_buffer = buffers[buffer_index];
current_buffer->euqueue(message);

每个线程使用来自其中一个缓冲区的消息并将结果排入其专用输出缓冲区。

// Thread #i pseudocode
auto message = my_buffer->dequeue();
auto result = process(message);
my_output_buffer->enqueue(result);

现在您需要按到达顺序处理所有这些消息。您可以使用专用的消费者线程通过以循环方式从输出循环缓冲区中出列已处理的消息来执行此操作。

// Consumer thread pseudocode
// out_message_counter is equal to message_counter at start
auto out_buffer_index = atomic_increment(&out_message_counter);
out_buffer_index = out_buffer_index % N;
// out_buffers is array of output buffers that is used by processing
// threads
auto out_buffer = out_buffers[out_buffer_index];
auto result = out_buffer->dequeue();
send(result);  // or whatever you need to do with result

在第二种情况下,当您不需要保留消息顺序时 - 您不需要消费者线程和输出循环缓冲区。您只需对处理线程的结果做任何您需要做的事情。

N 必须等于num CPU's- 3 在第一种情况下(“-3”是一个 I/O 线程 + 一个消费者线程 + 一个 DHT 线程)和num CPU's- 2 在第二种情况下(“-2”是一个 I/O 线程 + 一个 DHT 线程)。这是因为如果您的处理器超额使用,忙等待就不会有效。

于 2013-12-02T08:53:08.497 回答
1

Sounds like you want to coordinate a producer and consumer connected by some shared state. At least in Java for such patterns, one way to avoid busy wait is to use wait and notify. With this approach, thread #2 can go into a blocked state if it finds that the queue is empty by calling wait and avoid spinning the CPU. Once thread #1 puts some stuff in the queue, it can do a notify. A quick search of such mechanisms in C++ yields this:

wait and notify in C/C++ shared memory

于 2013-12-02T08:21:49.130 回答
0

You can have thread #2 go to sleep for X miliseconds when the queue is empty.

X can be determined by the length of the queues you want + some guard band.

BTW, in user mode (ring3) you can't use MONITOR/MWAIT instructions which would be ideal for your question.

Edit

You should definitely give TBB's RWlock a try (there's a free version). Sounds like what you're looking for.

Edit2

Another option is to use conditional variables. They involve a mutex and a condition. Basically you wait on the condition to become "true". The low level pthread stuff can be found here.

于 2013-12-02T07:17:37.150 回答