5

我必须从多播 (UDP) 流中读取一些数据(以惊人的速度 - 每秒最多 5000 条消息)。因为流是多播的(并且数据非常关键),数据提供者提供了两个发送相同数据的流(它们的逻辑是相同数据包在两个流中丢失的可能性非常接近于零)。所有数据包都标有序列号以进行跟踪。

此外,该应用程序对时间非常关键,以至于我不得不并行收听两个流并从它首先收到的任何多播流中获取下一个序列号 - 当相同的数据包出现在镜像流上时,我只是将其丢弃.

我计划在两个函数之间使用一个通用的“sequence_number”变量来实现这个drop特性——顺便说一下,这两个函数在不同的线程中运行。序列号是atomic因为它将从两个不同的线程中读取和更新。

想到的明显算法是

if (sequence number received from the stream > sequence_number)
{
   process packet;
   sequence_number = sequence number received from the stream;
}

(上述算法需要在序列号乱序时进行修改——它们可以作为 UDP 流——但暂时忘记它)

我的问题是这样的:

std::load我开始sequence_number,检查它是否小于我从流中接收到的序列号,接受数据包,最后 std::store将新的序列号改为sequence_number;如果另一个流接收到相同的数据包(具有相同的序列号)并执行相同的操作(在第一个流完成std::store该序列号之前),我基本上会在我的系统中两次收到相同的数据包。有什么办法可以克服这种情况?

4

3 回答 3

2

不要再担心处理乱序数据包,因为解决这个问题也为同步线程提供了最优雅的解决方案。

数组的元素是用于数据竞争的唯一内存位置。如果您根据其序列号将每个数据包(通过指针写入原子地)放入不同的数组元素中,您将摆脱大部分争用。还使用 compare-exchange 来检测其他线程(其他流)是否已经看到该数据包。

请注意,您不会有通常与 compare-exchange 关联的重试循环,或者您拥有数据包的第一个副本并且 compare-exchange 成功,或者数据包已经存在并且您的副本可以被丢弃。所以这种方法不仅无锁而且无等待:)

于 2014-07-19T18:22:18.577 回答
1

这是一种选择,如果您使用的是std::atomic值,请使用compare_exchange.

未显示如何初始化last_processed_seqnum,因为您需要将其设置为有效值,即比要到达的下一个数据包的 seqnum 小一。

它需要适应存在序列号间隙的情况。作为前提的一部分,您提到不会丢失序列号;但是下面的示例将在任何 seqnum 间隙时停止处理数据包(即灾难性地失败)。

std::atomic<int> last_processed_seqnum;
// sync last_processed_seqnum to first message(s).

int seqnum_from_stream = ...;
int putative_last_processed_seqnum = seqnum_from_stream - 1;


if (last_processed_seqnum.compare_exchange_strong(putative_last_processed_seqnum,
                                                  seqnum_from_stream))
{
   // sequence number has been updated in compare_exchange_strong
   // process packet;
} 

理想情况下,我们想要的是一个compare_exchange使用大于而不是等于的函数。我不知道有什么方法可以在一次操作中实现这种行为。我链接到的 SO 问题链接到有关迭代所有小于要更新的​​目标的值的答案。

于 2014-07-19T17:35:32.137 回答
0

您可能正在实施价格馈送处理程序,它是哪个交易所以及什么协议?是 ITCH 还是 FIX Fast?我不会为同一个提要推荐两个线程,因为您可能必须为不同的细分市场/板加入多个多播组。

于 2014-07-19T20:20:23.297 回答