我有一个“服务器”程序,可以更新共享内存中的许多链接列表以响应外部事件。我希望客户端程序尽快注意到任何列表的更新(最低延迟)。服务器将链表的节点标记state_
为FILLED
一旦其数据被填充并且其下一个指针已设置为有效位置。在那之前,它state_
是NOT_FILLED_YET
。我正在使用内存屏障来确保客户端不会state_
像FILLED
以前一样看到其中的数据实际上已经准备好(而且它似乎有效,我从来没有看到损坏的数据)。此外,state_
它是不稳定的,以确保编译器不会将客户端的检查从循环中解除。
保持服务器代码完全相同,我为客户端提供了 3 种不同的方法来扫描链接列表以进行更改。问题是:为什么第三种方法最快?
方法 1:连续循环遍历所有链表(称为“通道”),查看是否有任何节点已更改为“已填充”:
void method_one()
{
std::vector<Data*> channel_cursors;
for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
{
Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
channel_cursors.push_back(current_item);
}
while(true)
{
for(std::size_t i = 0; i < channel_list.size(); ++i)
{
Data* current_item = channel_cursors[i];
ACQUIRE_MEMORY_BARRIER;
if(current_item->state_ == NOT_FILLED_YET) {
continue;
}
log_latency(current_item->tv_sec_, current_item->tv_usec_);
channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment));
}
}
}
当通道数量很少时,方法 1 的延迟非常低。但是当频道数量增加(250K+)时,由于循环遍历所有频道,它变得非常慢。于是我试了...
方法二:给每个链表一个ID。在旁边保留一个单独的“更新列表”。每次更新链表之一时,将其 ID 推送到更新列表。现在我们只需要监控单个更新列表,并检查我们从中获得的 ID。
void method_two()
{
std::vector<Data*> channel_cursors;
for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
{
Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
channel_cursors.push_back(current_item);
}
UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
while(true)
{
ACQUIRE_MEMORY_BARRIER;
if(update_cursor->state_ == NOT_FILLED_YET) {
continue;
}
::uint32_t update_id = update_cursor->list_id_;
Data* current_item = channel_cursors[update_id];
if(current_item->state_ == NOT_FILLED_YET) {
std::cerr << "This should never print." << std::endl; // it doesn't
continue;
}
log_latency(current_item->tv_sec_, current_item->tv_usec_);
channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment));
update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
}
}
方法 2 给出了可怕的延迟。方法 1 可能会给出低于 10 毫秒的延迟,而方法 2 会莫名其妙地经常给出 8 毫秒的延迟!使用 gettimeofday 似乎 update_cursor->state_ 的变化从服务器的视图传播到客户端的视图非常缓慢(我在一个多核机器上,所以我认为延迟是由于缓存造成的)。所以我尝试了一种混合方法......
方法三:保留更新列表。但是不断循环遍历所有通道,并在每次迭代中检查更新列表是否已更新。如果有,请使用推到其上的号码。如果没有,请检查我们当前迭代到的通道。
void method_three()
{
std::vector<Data*> channel_cursors;
for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
{
Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
channel_cursors.push_back(current_item);
}
UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
while(true)
{
for(std::size_t i = 0; i < channel_list.size(); ++i)
{
std::size_t idx = i;
ACQUIRE_MEMORY_BARRIER;
if(update_cursor->state_ != NOT_FILLED_YET) {
//std::cerr << "Found via update" << std::endl;
i--;
idx = update_cursor->list_id_;
update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
}
Data* current_item = channel_cursors[idx];
ACQUIRE_MEMORY_BARRIER;
if(current_item->state_ == NOT_FILLED_YET) {
continue;
}
found_an_update = true;
log_latency(current_item->tv_sec_, current_item->tv_usec_);
channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment));
}
}
}
此方法的延迟与方法 1 一样好,但可以扩展到大量通道。问题是,我不知道为什么。只是为了解决问题:如果我取消注释“通过更新找到”部分,它会在每个延迟日志消息之间打印。这意味着东西只能在更新列表中找到!所以我不明白这种方法如何比方法2更快。
生成随机字符串作为测试数据的完整可编译代码(需要 GCC 和 boost-1.41)位于: http: //pastebin.com/0kuzm3Uf
更新:所有 3 种方法都有效地自旋锁定,直到发生更新。不同之处在于他们注意到更新发生需要多长时间。它们都不断地对处理器征税,因此这并不能解释速度差异。我正在一台 4 核机器上进行测试,没有其他任何东西在运行,所以服务器和客户端没有什么可竞争的。我什至制作了一个代码版本,其中更新表示条件并让客户端等待条件 - 它无助于任何方法的延迟。
Update2:尽管有 3 种方法,但我一次只尝试了 1 种,因此只有 1 个服务器和 1 个客户端在竞争 state_ 成员。