78

我正在设计一个连接到一个或多个数据馈送流的系统,并对数据进行一些分析,而不是根据结果触发事件。在典型的多线程生产者/消费者设置中,我将有多个生产者线程将数据放入队列,多个消费者线程读取数据,消费者只对最新的数据点加上 n 个点感兴趣。如果慢的消费者跟不上生产者线程将不得不阻塞,当然当没有未处理的更新时消费者线程将阻塞。使用带有读/写锁的典型并发队列会很好地工作,但数据进入的速度可能会很大,所以我想减少我的锁定开销,尤其是生产者的写锁。我认为我需要一个循环无锁缓冲区。

现在有两个问题:

  1. 循环无锁缓冲区是答案吗?

  2. 如果是这样,在我推出自己的产品之前,你知道任何符合我需要的公共实施吗?

任何实现循环无锁缓冲区的指针总是受欢迎的。

顺便说一句,在 Linux 上用 C++ 执行此操作。

一些附加信息:

响应时间对我的系统至关重要。理想情况下,消费者线程将希望尽快看到任何更新,因为额外的 1 毫秒延迟可能会使系统一文不值,或者价值大大降低。

我倾向于的设计思想是一个半无锁循环缓冲区,其中生产者线程尽可能快地将数据放入缓冲区中,让我们调用缓冲区 A 的头部,除非缓冲区已满,否则不会阻塞,当A 与缓冲区 Z 的末端相遇。消费者线程将分别持有两个指向循环缓冲区的指针 P 和 P n,其中 P 是线程的本地缓冲区头,P n是 P 之后的第 n 项。每个消费者线程将推进其 P和 P n一旦它完成处理当前 P 并且缓冲区指针 Z 的末尾以最慢的 P n前进. 当 P 赶上 A 时,这意味着不再需要处理新的更新,消费者旋转并忙于等待 A 再次前进。如果消费者线程旋转时间过长,它可以进入睡眠状态并等待条件变量,但我可以接受消费者占用 CPU 周期等待更新,因为这不会增加我的延迟(我将拥有更多的 CPU 内核比线程)。想象一下你有一个环形轨道,生产者跑在一堆消费者前面,关键是调系统,让生产者通常跑在消费者前面几步,而这些操作大部分都可以使用无锁技术完成。我知道要正确执行实施的细节并不容易……好吧,非常难,这就是为什么我想在自己犯一些错误之前从别人的错误中吸取教训。

4

18 回答 18

45

在过去的几年里,我对无锁数据结构进行了特别的研究。我已经阅读了该领域的大部分论文(只有大约 40 篇左右 - 尽管只有大约 10 到 15 篇真正有用:-)

AFAIK,尚未发明无锁循环缓冲区。问题将是处理读者超过作者或反之亦然的复杂情况。

如果您至少有六个月的时间没有学习无锁数据结构,请不要尝试自己编写。你会弄错,并且在你的代码在新平台上部署后失败之前,你可能并不明显存在错误。

但是我相信有一个解决方案可以满足您的要求。

您应该将无锁队列与无锁空闲列表配对。

free-list 将为您提供预分配,因此消除了对无锁分配器的(财政上昂贵的)要求;当空闲列表为空时,您可以通过立即从队列中取出一个元素并使用它来复制循环缓冲区的行为。

(当然,在基于锁的循环缓冲区中,一旦获得锁,获得一个元素非常快——基本上只是一个指针解引用——但在任何无锁算法中你都不会得到它;他们经常不得不去很好地做事;失败的自由列表弹出然后出队的开销与任何无锁算法需要做的工作量相当)。

早在 1996 年,Michael 和 Scott 就开发了一个非常好的无锁队列。下面的链接将为您提供足够的详细信息来追踪他们论文的 PDF;迈克尔和斯科特,先进先出

无锁空闲列表是最简单的无锁算法,实际上我认为我没有看过关于它的实际论文。

于 2009-05-20T21:09:38.507 回答
35

您想要的艺术术语是无锁队列。有一组很棒的笔记,其中包含Ross Bencina的代码和论文链接。我最信任他的工作的人是莫里斯·赫利希(对于美国人来说,他的名字发音为“莫里斯”)。

于 2009-05-16T01:09:29.683 回答
11

如果缓冲区为空或已满,则生产者或消费者阻塞的要求建议您应该使用正常的锁定数据结构,使用信号量或条件变量使生产者和消费者阻塞,直到数据可用。无锁代码通常不会在这种情况下阻塞 - 它会旋转或放弃无法完成的操作,而不是使用操作系统进行阻塞。(如果你可以等到另一个线程产生或消费数据,那为什么还要等待另一个线程完成更新数据结构的锁更糟糕呢?)

在 (x86/x64) Linux 上,如果没有争用,使用互斥锁的线程内同步相当便宜。专注于最大限度地减少生产者和消费者需要锁定的时间。鉴于您已经说过您只关心最后 N 个记录的数据点,我认为循环缓冲区可以很好地做到这一点。但是,我真的不明白这如何符合阻塞要求以及消费者实际消费(删除)他们读取的数据的想法。(您是否希望消费者只查看最后 N 个数据点而不删除它们?您是否希望生产者不关心消费者是否跟不上,而只是覆盖旧数据?)

此外,正如 Zan Lynx 评论的那样,当您有大量数据进入时,您可以将数据聚合/缓冲成更大的块。您可以缓冲固定数量的点,或者在一定时间内接收到的所有数据. 这意味着将有更少的同步操作。不过,它确实会引入延迟,但如果您不使用实时 Linux,那么无论如何您都必须在一定程度上处理它。

于 2009-05-16T00:44:33.503 回答
7

boost库中的实现值得考虑。它易于使用且性能相当高。我编写了一个测试并在一台四核 i7 笔记本电脑(8 个线程)上运行它,每秒获得约 4M 的入队/出队操作。到目前为止未提及的另一个实现是http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue上的 MPMC 队列。我在有 32 个生产者和 32 个消费者的同一台笔记本电脑上对这个实现做了一些简单的测试。正如宣传的那样,它比 boost 无锁队列更快。

由于大多数其他答案表明无锁编程很难。大多数实现都很难检测到需要大量测试和调试才能修复的极端情况。这些通常通过在代码中仔细放置内存屏障来解决。您还将在许多学术文章中找到正确性证明。我更喜欢使用蛮力工具测试这些实现。您计划在生产中使用的任何无锁算法都应使用http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html之类的工具检查其正确性。

于 2015-09-13T17:49:00.917 回答
6

在 DDJ 上有很多关于此的系列文章。作为这个东西有多难的标志,它是对早期文章的更正,它弄错了。确保在自己动手之前了解错误)-;

于 2009-05-16T00:02:06.617 回答
5

萨特的队列是次优的,他知道这一点。The Art of Multicore programming 是一个很好的参考,但不要相信内存模型上的 Java 人。罗斯的链接不会给你明确的答案,因为他们的图书馆有这样的问题等等。

做无锁编程是自找麻烦,除非你想在解决问题之前花很多时间在你明显过度设计的事情上(从它的描述来看,这是一种“寻求完美”的常见疯狂' 在缓存一致性中)。这需要数年时间,导致不能先解决问题,然后再优化,这是一种常见病。

于 2009-05-16T23:31:58.130 回答
5

减少争用的一种有用技术是将项目散列到多个队列中,并让每个消费者专注于一个“主题”。

对于您的消费者感兴趣的最新数量的项目 - 您不想锁定整个队列并遍历它以找到要覆盖的项目 - 只需在 N 元组中发布项目,即所有 N 个最近的项目。实现的奖励点,生产者将阻塞完整队列(当消费者无法跟上时)超时,更新其本地元组缓存 - 这样您就不会对数据源施加背压。

于 2009-05-16T03:37:43.227 回答
5

我不是硬件内存模型和无锁数据结构的专家,我倾向于避免在我的项目中使用它们,我使用传统的锁定数据结构。

但是我最近注意到视频: 基于环形缓冲区的无锁 SPSC 队列

这是基于交易系统使用的名为 LMAX distruptor 的开源高性能 Java 库:LMAX Distruptor

根据上面的介绍,您可以使头指针和尾指针原子化,并以原子方式检查头从后面抓住尾巴的情况,反之亦然。

您可以在下面看到一个非常基本的 C++11 实现:

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;

#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
    public :

        lockless_ring_buffer_spsc()
        {
            write.store(0);
            read.store(0);
        }

        bool try_push(int64_t val)
        {
            const auto current_tail = write.load();
            const auto next_tail = increment(current_tail);
            if (next_tail != read.load())
            {
                buffer[current_tail] = val;
                write.store(next_tail);
                return true;
            }

            return false;  
        }

        void push(int64_t val)
        {
            while( ! try_push(val) );
            // TODO: exponential backoff / sleep
        }

        bool try_pop(int64_t* pval)
        {
            auto currentHead = read.load();

            if (currentHead == write.load())
            {
                return false;
            }

            *pval = buffer[currentHead];
            read.store(increment(currentHead));

            return true;
        }

        int64_t pop()
        {
            int64_t ret;
            while( ! try_pop(&ret) );
            // TODO: exponential backoff / sleep
            return ret;
        }

    private :
        std::atomic<int64_t> write;
        std::atomic<int64_t> read;
        static const int64_t size = RING_BUFFER_SIZE;
        int64_t buffer[RING_BUFFER_SIZE];

        int64_t increment(int n)
        {
            return (n + 1) % size;
        }
};

int main (int argc, char** argv)
{
    lockless_ring_buffer_spsc queue;

    std::thread write_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.push(i);
             }
         }  // End of lambda expression
                                                );
    std::thread read_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.pop();
             }
         }  // End of lambda expression
                                                );
    write_thread.join();
    read_thread.join();

     return 0;
}
于 2015-06-17T14:30:00.860 回答
4

我同意这篇文章并建议不要使用无锁数据结构。一篇相对较新的关于无锁fifo队列的论文是this,搜索相同作者的更多论文;还有一篇关于 Chalmers 的关于无锁数据结构的博士论文(我失去了链接)。但是,您并没有说您的元素有多大——无锁数据结构仅适用于字大小的项目,因此如果元素大于机器字(32 或 64),则必须动态分配元素位)。如果您动态分配元素,则将(假设,因为您没有分析您的程序并且您基本上正在进行过早优化)瓶颈转移到内存分配器,因此您需要一个无锁内存分配器,例如Streamflow,并将其与您的应用程序集成。

于 2009-05-16T05:29:44.923 回答
4

这是一个旧线程,但由于尚未提及 - 在 JUCE C++ 框架中有一个无锁、循环、1 个生产者 -> 1 个消费者、FIFO 可用。

https://www.juce.com/doc/classAbstractFifo#details

于 2016-04-19T08:54:01.030 回答
4

虽然这是一个老问题,但没有人提到DPDK的无锁环形缓冲区。它是一个高吞吐量的环形缓冲区,支持多个生产者和多个消费者。它还提供了单消费者和单生产者模式,并且在SPSC模式下环形缓冲区是免等待的。它是用 C 语言编写的,支持多种架构。

此外,它支持 Bulk 和 Burst 模式,其中项目可以批量入队/出队。该设计让多个消费者或多个生产者同时写入队列,只需通过移动原子指针来保留空间。

于 2017-09-15T19:41:03.083 回答
3

前段时间,我找到了一个很好的解决这个问题的方法。我相信它是迄今为止发现的最小的。

存储库有一个示例,说明如何使用它创建 N 个线程(读取器和写入器),然后共享一个席位。

我在测试示例上做了一些基准测试,得到了以下结果(以百万次操作/秒为单位):

按缓冲区大小

吞吐量

按线程数

在此处输入图像描述

请注意线程数如何不改变吞吐量。

我认为这是这个问题的最终解决方案。它的工作原理令人难以置信的快速和简单。即使有数百个线程和单个位置的队列。它可以用作线程之间的管道,在队列内分配空间。

你能打破它吗?

于 2019-12-31T03:32:51.220 回答
2

只是为了完整性:在OtlContainers中有经过良好测试的无锁循环缓冲区,但它是用 Delphi 编写的(TOmniBaseBoundedQueue 是循环缓冲区, TOmniBaseBoundedStack 是有界堆栈)。在同一单元(TOmniBaseQueue)中还有一个无界队列。无界队列在动态无锁队列中进行了描述——做对了。有界队列(循环缓冲区)的初始实现在无锁队列中有所描述,终于!但此后代码已更新。

于 2010-03-11T15:22:22.317 回答
2

查看Disruptor如何使用它),它是一个可供多个线程订阅的环形缓冲区:

于 2012-03-21T08:53:08.663 回答
1

这是我的做法:

  • 将队列映射到数组
  • 使用下一次读取和下一次写入索引保持状态
  • 保留一个空的完整位向量

插入包括使用具有增量的 CAS 并在下一次写入时翻转。一旦你有一个插槽,添加你的值,然后设置与之匹配的空/满位。

删除需要在测试下溢之前检查该位,但除此之外,与写入相同,但使用读取索引并清除空/满位。

被警告,

  1. 我不是这些方面的专家
  2. 当我使用原子 ASM 操作时,它们似乎非常慢,因此如果您最终使用的操作不止几个,那么使用嵌入/删除函数中的锁可能会更快。理论是,单个原子操作来获取锁,然后(非常)少数非原子 ASM 操作可能比几个原子操作完成的相同操作更快。但是要完成这项工作需要手动或自动内联,所以这只是 ASM 的一小部分。
于 2009-05-20T21:33:38.333 回答
1

在某些情况下,您不需要锁定来防止竞争条件,尤其是当您只有一个生产者和消费者时。

考虑LDD3中的这段:

如果仔细实施,循环缓冲区在没有多个生产者或消费者的情况下不需要锁定。生产者是唯一允许修改写入索引及其指向的数组位置的线程。只要写入器在更新写入索引之前将新值存储到缓冲区中,读取器将始终看到一致的视图。反过来,阅读器是唯一可以访问读取索引及其指向的值的线程。稍加注意以确保两个指针不会相互溢出,生产者和消费者可以同时访问缓冲区,而不会出现竞争条件。

于 2018-11-29T09:39:05.203 回答
1

你可以试试lfqueue

使用简单,圆形设计无锁

int *ret;

lfqueue_t results;

lfqueue_init(&results);

/** Wrap This scope in multithread testing **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&results, int_data) != 1) ;

/*Dequeue*/
while ( (ret = lfqueue_deq(&results)) == NULL);

// printf("%d\n", *(int*) ret );
free(ret);
/** End **/

lfqueue_clear(&results);
于 2018-07-24T14:32:51.390 回答
0

如果您以缓冲区永远不会变满为先决条件,请考虑使用此无锁算法:

capacity must be a power of 2
buffer = new T[capacity] ~ on different cache line
mask = capacity - 1
write_index ~ on different cache line
read_index ~ on different cache line

enqueue:
    write_i = write_index.fetch_add(1) & mask
    buffer[write_i] = element ~ release store

dequeue:
    read_i = read_index.fetch_add(1) & mask
    element
    while ((element = buffer[read_i] ~ acquire load) == NULL) {
        spin loop
    }
    buffer[read_i] = NULL ~ relaxed store
    return element
于 2020-07-17T20:14:39.197 回答