我正在解决FAST协议的两个提要仲裁问题。如果您不熟悉它,请不要担心,我的问题实际上很笼统。但我正在为感兴趣的人添加问题描述(你可以跳过它)。
所有 UDP 源中的数据都在两个不同的多播 IP 上的两个相同的源(A 和 B)中传播。强烈建议客户端接收并处理这两个提要,因为可能会丢失 UDP 数据包。处理两个相同的提要允许一个在统计上降低丢包的概率。未指定消息首次出现在哪个特定提要(A 或 B)中。要仲裁这些提要,应该使用在 Preamble 或标签 34-MsgSeqNum 中找到的消息序列号。Preamble 的使用允许在不解码 FAST 消息的情况下确定消息序列号。应使用以下算法处理来自提要 A 和 B 的消息:
- 收听提要 A 和 B
- 根据消息的序号处理消息。
- 如果之前已经处理过具有相同序列号的消息,则忽略该消息。
如果出现序列号间隙,则表明两个馈送(A 和 B)中的数据包丢失。客户端应启动其中一个恢复过程。但是首先客户端应该等待一个合理的时间,也许丢失的数据包会因为数据包重新排序而稍晚出现。UDP 协议不能保证数据包按顺序传送。
// tcp 恢复算法进一步
我写了这么简单的类。它预先分配所有需要的类,然后接收特定的第一个线程seqNum
可以处理它。另一个线程稍后将删除它:
class MsgQueue
{
public:
MsgQueue();
~MsgQueue(void);
bool Lock(uint32_t msgSeqNum);
Msg& Get(uint32_t msgSeqNum);
void Commit(uint32_t msgSeqNum);
private:
void Process();
static const int QUEUE_LENGTH = 1000000;
// 0 - available for use; 1 - processing; 2 - ready
std::atomic<uint16_t> status[QUEUE_LENGTH];
Msg updates[QUEUE_LENGTH];
};
执行:
MsgQueue::MsgQueue()
{
memset(status, 0, sizeof(status));
}
MsgQueue::~MsgQueue(void)
{
}
// For the same msgSeqNum should return true to only one thread
bool MsgQueue::Lock(uint32_t msgSeqNum)
{
uint16_t expected = 0;
return status[msgSeqNum].compare_exchange_strong(expected, 1);
}
void MsgQueue::Commit(uint32_t msgSeqNum)
{
status[msgSeqNum] = 2;
Process();
}
// this method probably should be combined with "Lock" but please ignore! :)
Msg& MsgQueue::Get(uint32_t msgSeqNum)
{
return updates[msgSeqNum];
}
void MsgQueue::Process()
{
// ready packets must be processed,
}
用法:
if (!msgQueue.Lock(seq)) {
return;
}
Msg msg = msgQueue.Get(seq);
msg.Ticker = "HP"
msg.Bid = 100;
msg.Offer = 101;
msgQueue.Commit(seq);
如果我们假设 QUEUE_LENGTH 是无穷大,这很好用。因为在这种情况下,一个 msgSeqNum = 一个updates
数组项。
但是我必须使缓冲区循环,因为不可能存储整个历史记录(数百万个数据包)并且没有理由这样做。实际上我需要缓冲足够的数据包来重建会话,一旦重建会话,我就可以丢弃它们。
但是拥有循环缓冲区会使算法变得非常复杂。例如,假设我们有长度为 1000 的循环缓冲区。同时我们尝试处理 seqNum = 10 000 和 seqNum = 11 000(这非常不可能,但仍然可能)。这两个数据包都将映射到updates
索引处的数组,0
因此会发生冲突。在这种情况下,缓冲区应该“丢弃”旧数据包并处理新数据包。
实现我想要使用的东西很简单,但是在不同线程使用的循环缓冲区上locks
编写代码确实很复杂。lock-free
所以我欢迎任何建议和建议如何做到这一点。谢谢!