不幸的是,TCP 不能传输消息,只能传输字节流。如果要传输消息,则必须在顶部应用协议。高性能的最佳协议是那些使用健全性可检查标头指定消息长度的协议 - 这允许您将正确数量的数据直接读取到合适的缓冲区对象中,而无需逐字节迭代数据以寻找结束 -消息字符。然后可以将缓冲区指针排队到另一个线程,并为下一条消息创建/分离一个新的缓冲区对象。这避免了批量数据的任何复制,并且对于大型消息来说,足够高效,使用非阻塞队列作为消息对象指针有点毫无意义。
下一个可用的优化是池化对象 *buffers 以避免持续的 new/dispose,回收消费者线程中的 *buffers 以便在网络接收线程中重新使用。使用 ConcurrentQueue 很容易做到这一点,如果池暂时清空,最好阻塞以允许流控制而不是数据损坏或段错误/AV。
接下来,在每个 *buffer 数据成员的开头添加一个 [cacheline size] 'dead-zone',以防止任何线程与任何其他线程虚假共享数据。
结果应该是一个完整的消息的高带宽流到消费者线程,具有非常小的延迟、CPU 浪费或缓存抖动。您的所有 24 个内核都可以在不同的数据上运行。
在多线程应用程序中复制大量数据是对糟糕设计和失败的承认。
跟进..
听起来您因为协议不同而无法迭代数据:(
无虚假共享的 PDU 缓冲区对象,例如:
typedef struct{
char deadZone[256]; // anti-false-sharing
int dataLen;
char data[8388608]; // 8 meg of data
} SbufferData;
class TdataBuffer: public{
private:
TbufferPool *myPool; // reference to pool used, in case more than one
EpduState PDUstate; // enum state variable used to decode protocol
protected:
SbufferData netData;
public:
virtual reInit(); // zeros dataLen, resets PDUstate etc. - call when depooling a buffer
virtual int loadPDU(char *fromHere,int len); // loads protocol unit
release(); // pushes 'this' back onto 'myPool'
};
loadPDU 得到一个指向原始网络数据长度的指针。它返回 0 - 意味着它还没有完全组装一个 PDU,或者它从原始网络数据中吃掉的字节数来完全组装一个 PDU,在这种情况下,将它排队,分离另一个并调用 loadPDU()使用未使用的剩余原始数据,然后继续输入下一个原始数据。
如果需要,您可以使用不同派生缓冲区类的不同池来服务不同的协议 - TbufferPool[Eprotocols] 数组。TbufferPool 可能只是一个 BlockingCollection 队列。管理变得几乎是微不足道的——缓冲区可以发送到系统周围的队列中,发送到 GUI 以显示统计信息,然后可能发送到记录器,只要在队列链的末尾调用 release()。
显然,一个“真正的”PDU 对象可能会加载更多的方法、数据联合/结构、迭代器和一个状态引擎来操作协议,但无论如何这是基本思想。主要的是易于管理、封装,并且由于没有两个线程可以在同一个缓冲区实例上操作,因此解析/访问数据不需要锁定/同步。
哦,是的,而且由于没有队列必须保持锁定的时间超过推送/弹出一个指针所需的时间,因此实际争用的机会非常低——即使是传统的阻塞队列也几乎不需要使用内核锁定。