8

我有一个“服务器”程序,可以更新共享内存中的许多链接列表以响应外部事件。我希望客户端程序尽快注意到任何列表的更新(最低延迟)。服务器将链表的节点标记state_FILLED一旦其数据被填充并且其下一个指针已设置为有效位置。在那之前,它state_NOT_FILLED_YET。我正在使用内存屏障来确保客户端不会state_FILLED以前一样看到其中的数据实际上已经准备好(而且它似乎有效,我从来没有看到损坏的数据)。此外,state_它是不稳定的,以确保编译器不会将客户端的检查从循环中解除。

保持服务器代码完全相同,我为客户端提供了 3 种不同的方法来扫描链接列表以进行更改。问题是:为什么第三种方法最快?

方法 1:连续循环遍历所有链表(称为“通道”),查看是否有任何节点已更改为“已填充”:

void method_one()
{
    std::vector<Data*> channel_cursors;
    for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
    {
        Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
        channel_cursors.push_back(current_item);
    }

    while(true)
    {
        for(std::size_t i = 0; i < channel_list.size(); ++i)
        {   
            Data* current_item = channel_cursors[i];

            ACQUIRE_MEMORY_BARRIER;
            if(current_item->state_ == NOT_FILLED_YET) {
                continue;
            }

            log_latency(current_item->tv_sec_, current_item->tv_usec_);

            channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment));
        }
    }
}

当通道数量很少时,方法 1 的延迟非常低。但是当频道数量增加(250K+)时,由于循环遍历所有频道,它变得非常慢。于是我试了...

方法二:给每个链表一个ID。在旁边保留一个单独的“更新列表”。每次更新链表之一时,将其 ID 推送到更新列表。现在我们只需要监控单个更新列表,并检查我们从中获得的 ID。

void method_two()
{
    std::vector<Data*> channel_cursors;
    for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
    {
        Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
        channel_cursors.push_back(current_item);
    }

    UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));

    while(true)
    {   
            ACQUIRE_MEMORY_BARRIER;
        if(update_cursor->state_ == NOT_FILLED_YET) {
            continue;
        }

        ::uint32_t update_id = update_cursor->list_id_;

        Data* current_item = channel_cursors[update_id];

        if(current_item->state_ == NOT_FILLED_YET) {
            std::cerr << "This should never print." << std::endl; // it doesn't
            continue;
        }

        log_latency(current_item->tv_sec_, current_item->tv_usec_);

        channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment));
        update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
    }   
}

方法 2 给出了可怕的延迟。方法 1 可能会给出低于 10 毫秒的延迟,而方法 2 会莫名其妙地经常给出 8 毫秒的延迟!使用 gettimeofday 似乎 update_cursor->state_ 的变化从服务器的视图传播到客户端的视图非常缓慢(我在一个多核机器上,所以我认为延迟是由于缓存造成的)。所以我尝试了一种混合方法......

方法三:保留更新列表。但是不断循环遍历所有通道,并在每次迭代中检查更新列表是否已更新。如果有,请使用推到其上的号码。如果没有,请检查我们当前迭代到的通道。

void method_three()
{
    std::vector<Data*> channel_cursors;
    for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
    {
        Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
        channel_cursors.push_back(current_item);
    }

    UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));

    while(true)
    {
        for(std::size_t i = 0; i < channel_list.size(); ++i)
        {
            std::size_t idx = i;

            ACQUIRE_MEMORY_BARRIER;
            if(update_cursor->state_ != NOT_FILLED_YET) {
                //std::cerr << "Found via update" << std::endl;
                i--;
                idx = update_cursor->list_id_;
                update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
            }

            Data* current_item = channel_cursors[idx];

            ACQUIRE_MEMORY_BARRIER;
            if(current_item->state_ == NOT_FILLED_YET) {
                continue;
            }

            found_an_update = true;

            log_latency(current_item->tv_sec_, current_item->tv_usec_);
            channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment));
        }
    }
}

此方法的延迟与方法 1 一样好,但可以扩展到大量通道。问题是,我不知道为什么。只是为了解决问题:如果我取消注释“通过更新找到”部分,它会在每个延迟日志消息之间打印。这意味着东西只能在更新列表中找到!所以我不明白这种方法如何比方法2更快。

生成随机字符串作为测试数据的完整可编译代码(需要 GCC 和 boost-1.41)位于: http: //pastebin.com/0kuzm3Uf

更新:所有 3 种方法都有效地自旋锁定,直到发生更新。不同之处在于他们注意到更新发生需要多长时间。它们不断地对处理器征税,因此这并不能解释速度差异。我正在一台 4 核机器上进行测试,没有其他任何东西在运行,所以服务器和客户端没有什么可竞争的。我什至制作了一个代码版本,其中更新表示条件并让客户端等待条件 - 它无助于任何方法的延迟。

Update2:尽管有 3 种方法,但我一次只尝试了 1 种,因此只有 1 个服务器和 1 个客户端在竞争 state_ 成员。

4

4 回答 4

1

假设:方法 2 以某种方式阻止服务器写入更新。

除了处理器内核本身之外,您可以锤炼的一件事是您的连贯缓存。当您读取给定内核上的值时,该内核上的 L1 缓存必须获得对该缓存行的读取访问权限,这意味着它需要使任何其他缓存拥有的对该行的写访问权限无效。反之亦然写一个值。因此,这意味着您在“写入”状态(在服务器核心的缓存中)和“读取”状态(在所有客户端核心的缓存中)之间不断地来回切换缓存行。

x86 缓存性能的复杂性并不是我完全熟悉的,但看起来完全合理(至少在理论上)你正在做的事情是让三个不同的线程尽可能地用 read- 敲击这个内存位置 -访问请求大约是在服务器上创建拒绝服务攻击,有时会在几毫秒内阻止它写入该缓存行。

您可以通过查看服务器将值实际写入更新列表所需的时间进行实验来检测这一点,并查看那里是否存在与延迟相对应的延迟。

您还可以尝试从等式中删除缓存,方法是在单个内核上运行所有内容,以便客户端和服务器线程从同一个 L1 缓存中提取内容。

于 2010-03-28T05:14:05.253 回答
1

我不知道你是否读过 Herb Sutter 的 Concurrency 专栏。它们非常有趣,尤其是当您遇到缓存问题时。

实际上,Method2这里似乎更好,因为 id 通常小于数据,这意味着您不必经常往返于主内存(这很费力)。

然而,实际上可能发生的是你有这样一行缓存:

Line of cache = [ID1, ID2, ID3, ID4, ...]
                  ^         ^
            client          server

然后产生争用。

这是 Herb Sutter 的文章:消除虚假共享。基本思想是简单地人为地增加列表中的 ID,使其完全占用一行缓存。

阅读该系列的其他文章。也许你会得到一些想法。我认为有一个不错的无锁循环缓冲区可以帮助您更新列表:)

于 2010-03-28T14:25:59.800 回答
0

我注意到在方法 1 和方法 3 中都有一行,ACQUIRE_MEMORY_BARRIER我认为这与多线程/竞争条件有关?

无论哪种方式,方法 2 都没有任何睡眠,这意味着以下代码......

while(true)
{   
    if(update_cursor->state_ == NOT_FILLED_YET) {
        continue;
    }

将锤击处理器。执行这种生产者/消费者任务的典型方法是使用某种信号量向阅读器发出更新列表已更改的信号。搜索生产者/消费者多线程应该会给你大量的例子。这里的主要思想是,这允许线程在等待 update_cursor->state 更改时进入睡眠状态。这可以防止该线程窃取所有 cpu 周期。

于 2010-03-28T02:59:17.670 回答
0

答案很难弄清楚,公平地说,我提供的信息很难,但如果有人真的编译了我提供的源代码,他们就有机会;)我说“通过更新列表找到”被打印出来了在每条延迟日志消息之后,但这实际上并非如此——仅在我可以在终端中回滚时才如此。一开始有大量的更新没有使用更新列表。

问题是,在我在更新列表中设置起点和在每个数据列表中设置起点之间,会有一些滞后,因为这些操作需要时间。请记住,列表在整个过程中都在增长。考虑最简单的情况,我有 2 个数据列表 A 和 B。当我在更新列表中设置起点时,其中恰好有 60 个元素,因为列表 A 上有 30 个更新,列表 B 有 30 个更新。说他们已经交替:

A
B
A
B
A // and I start looking at the list here
B

但是在我将更新列表设置到那里之后,B 有大量更新,而 A 没有更新。然后我在每个数据列表中设置了我的起始位置。我的数据列表的起点将是那次更新激增之后,但我在更新列表中的起点是在那次激增之前,所以现在我要检查一堆更新而不找到它们。上面的混合方法效果最好,因为通过在找不到更新时迭代所有元素,它可以快速缩小更新列表所在位置与数据列表所在位置之间的时间差距。

于 2010-04-14T21:52:21.343 回答