88

我一直在用谷歌搜索 C++ 中的无锁队列。我找到了一些代码和一些试验——但我无法编译。也欢迎使用无锁哈希。

摘要:到目前为止,我没有肯定的答案。没有“生产就绪”的库,令人惊讶的是,现有的库都不符合 STL 容器的 API。

4

15 回答 15

41

从 1.53 开始,boost 提供了一组无锁数据结构,包括队列、堆栈和单生产者/单消费者队列(即环形缓冲区)。

于 2013-02-18T12:53:06.720 回答
25

起点可以是 Herb Sutter 的针对单个生产者和消费者多个的 DDJ 文章。他给出的代码(从每篇文章的第二页开始)使用 C++0x 风格的 atomic<T> 模板类型;您可以使用 Boost 进程间库进行模仿。

boost 代码隐藏在进程间库的深处,但是在阅读了适当的头文件 (atomic.hpp) 后,我熟悉的系统上必要的比较和交换操作的实现看起来不错。

于 2009-07-22T09:14:38.097 回答
17

是的!

写了一个无锁队列。它具有功能™:

  • 完全免等待(无 CAS 循环)
  • 超快(每秒超过一亿次入队/出队操作)
  • 使用 C++11 移动语义
  • 根据需要增长(但前提是你想要它)
  • 对元素进行无锁内存管理(使用预分配的连续块)
  • 独立(两个标题加上许可证和自述文件)
  • 在 MSVC2010+、Intel ICC 13 和 GCC 4.7.2 下编译(并且应该在任何 C++11 完全兼容的编译器下工作)

可以在简化的 BSD 许可下在 GitHub 上获得(随意分叉它!)。

注意事项:

  • 仅适用于单生产者单消费者架构(即两个线程)
  • 在 x86(-64) 上进行了全面测试,并且应该可以在 ARM、PowerPC 和其他 CPU 上工作,其中对齐的本机大小的整数和指针加载和存储自然是原子的,但尚未在非 x86 CPU 上进行现场测试(如果有人有一个测试它让我知道)
  • 不知道是否侵犯了任何专利(使用风险自负等)。请注意,我确实从头开始自己设计和实现了它。
于 2013-05-06T21:56:09.220 回答
17

Facebook 的Folly似乎有基于 C++11 的无锁数据结构<atomic>

我敢说这些目前在生产中使用,所以我想它们可以安全地用于其他项目。

干杯!

于 2013-04-15T15:51:24.303 回答
11

有这样一个库,但它是在 C 中的。

包装到 C++ 应该很简单。

liblfds

于 2009-10-17T10:18:24.150 回答
10

boost.lockfree 是一种尝试创建无锁堆栈和 fifo 类的 c++ 实现。

公共 git 存储库

于 2009-08-28T10:21:57.697 回答
9

在检查了大部分给定的答案后,我只能说:

答案是否定的。

没有这样的东西可以开箱即用。

于 2009-08-27T19:51:30.503 回答
6

我所知道的最接近的是Windows Interlocked Single Linked Lists。当然,它只是 Windows。

于 2009-10-09T16:06:18.293 回答
5

如果您有一个多生产者/单消费者队列/FIFO,您可以使用 SLIST 或一个普通的无锁 LIFO 堆栈轻松创建一个无锁。您所做的是为消费者提供第二个“私有”堆栈(为简单起见,也可以作为 SLIST 或您选择的任何其他堆栈模型来完成)。消费者从私有堆栈中弹出项目。每当私有 LIFO 耗尽时,您执行 Flush 而不是从共享并发 SLIST 弹出(获取整个 SLIST 链),然后按顺序遍历 Flushed 列表将项目推入私有堆栈。

这适用于单一生产者/单一消费者和多生产者/单一消费者。

但是,它不适用于多消费者情况(使用单个生产者或多个生产者)。

此外,就哈希表而言,它们是“条带化”的理想候选者,它只是将哈希划分为每个缓存段都有一个锁的段。这就是 Java 并发库的工作方式(使用 32 条带)。如果您有一个轻量级的读写器锁,则可以同时访问哈希表以进行同时读取,并且您只会在有争议的条带上发生写入时才会停止(并且可能如果您允许增长哈希表)。

如果您自己滚动,请确保将您的锁与哈希条目交错,而不是将所有锁放在一个彼此相邻的数组中,这样您就不太可能出现错误共享。

于 2009-10-09T16:00:12.877 回答
4

我可能来晚了一点。

没有解决方案(在提出问题时)主要是由于 C++ 中的一个重要问题(在 C++0x/11 之前):C++ 没有(有)并发内存模型。

现在,使用 std::atomic,您可以控制内存排序问题并进行适当的比较和交换操作。我使用 C++11 和 Micheal 的危险指针 (IEEE TPDS 2004) 为自己编写了一个 Micheal&Scott 的无锁队列 (PODC96) 的实现,以避免早期释放和 ABA 问题。它工作正常,但它是一个快速而肮脏的实现,我对实际性能不满意。代码在 bitbucket 上可用:LockFreeExperiment

也可以使用双字 CAS 实现无危险指针的无锁队列(但 64 位版本只能在 x86-64 上使用 cmpxchg16b 实现),我有一篇关于此的博客文章(队列代码未经测试)here :为 x86/x86-64 实现通用双字比较和交换(伦敦政治经济学院博客。)

我自己的基准测试告诉我,双锁定队列(也在 Micheal&Scott 1996 论文中)的性能与无锁队列一样好(我没有达到足够的争用,因此锁定的数据结构存在性能问题,但是我的工作台太轻了现在)并且来自英特尔 TBB 的并发队列似乎更好(快两倍)对于一个相对较小的数字(取决于操作系统,在 FreeBSD 9 下,我到目前为止找到的最低限度,这个数字是 8 个线程i7 有 4 个 ht 核,因此有 8 个逻辑 CPU)线程并且具有非常奇怪的行为(我的简单基准测试的执行时间从几秒变为几小时!)

遵循 STL 风格的无锁队列的另一个限制:在无锁队列上设置迭代器是没有意义的。

于 2013-07-30T19:26:25.187 回答
3

然后英特尔线程构建模块出现了。有一段时间,这很好。

PS:您正在寻找 concurrent_queue 和 concurrent_hash_map

于 2009-07-22T12:18:12.123 回答
1

以下内容来自 Herb Sutter 关于并发无锁队列的文章http://www.drdobbs.com/parallel/writing-a-generalized-concurrent-queue/211601363?pgno=1。我做了一些改变,比如编译器重新排序的东西。需要 GCC v4.4+ 来编译此代码。

#include <atomic>
#include <iostream>
using namespace std;

//compile with g++ setting -std=c++0x

#define CACHE_LINE_SIZE 64

template <typename T>
struct LowLockQueue {
private:
    struct Node {
    Node( T* val ) : value(val), next(nullptr) { }
    T* value;
    atomic<Node*> next;
    char pad[CACHE_LINE_SIZE - sizeof(T*)- sizeof(atomic<Node*>)];
    };
    char pad0[CACHE_LINE_SIZE];

// for one consumer at a time
    Node* first;

    char pad1[CACHE_LINE_SIZE
          - sizeof(Node*)];

// shared among consumers
    atomic<bool> consumerLock;

    char pad2[CACHE_LINE_SIZE
          - sizeof(atomic<bool>)];

// for one producer at a time
    Node* last;

    char pad3[CACHE_LINE_SIZE
          - sizeof(Node*)];

// shared among producers
    atomic<bool> producerLock;

    char pad4[CACHE_LINE_SIZE
          - sizeof(atomic<bool>)];

public:
    LowLockQueue() {
    first = last = new Node( nullptr );
    producerLock = consumerLock = false;
    }
    ~LowLockQueue() {
    while( first != nullptr ) {      // release the list
        Node* tmp = first;
        first = tmp->next;
        delete tmp->value;       // no-op if null
        delete tmp;
    }
    }

    void Produce( const T& t ) {
    Node* tmp = new Node( new T(t) );
    asm volatile("" ::: "memory");                            // prevent compiler reordering
    while( producerLock.exchange(true) )
        { }   // acquire exclusivity
    last->next = tmp;         // publish to consumers
    last = tmp;             // swing last forward
    producerLock = false;       // release exclusivity
    }

    bool Consume( T& result ) {
    while( consumerLock.exchange(true) )
        { }    // acquire exclusivity
    Node* theFirst = first;
    Node* theNext = first-> next;
    if( theNext != nullptr ) {   // if queue is nonempty
        T* val = theNext->value;    // take it out
        asm volatile("" ::: "memory");                            // prevent compiler reordering
        theNext->value = nullptr;  // of the Node
        first = theNext;          // swing first forward
        consumerLock = false;             // release exclusivity
        result = *val;    // now copy it back
        delete val;       // clean up the value
        delete theFirst;      // and the old dummy
        return true;      // and report success
    }
    consumerLock = false;   // release exclusivity
    return false;                  // report queue was empty
    }
};

int main(int argc, char* argv[])
{
    //Instead of this Mambo Jambo one can use pthreads in Linux to test comprehensively
LowLockQueue<int> Q;
Q.Produce(2);
Q.Produce(6);

int a;
Q.Consume(a);
cout<< a << endl;
Q.Consume(a);
cout<< a << endl;

return 0;
}
于 2012-09-14T13:25:34.850 回答
1

据我所知,目前还没有这样的公开可用的东西。实现者需要解决的一个问题是您需要一个无锁内存分配器,它存在,但我现在似乎找不到链接。

于 2009-07-22T12:14:56.270 回答
0

我找到了另一个用 c 编写的解决方案:

http://www.ddj.com/hpc-high-performance-computing/219500200

于 2009-10-21T09:13:10.443 回答
0

我可能在 2010 年的某个时候写了这篇文章,我确信在不同参考资料的帮助下。它是多生产者单一消费者。

template <typename T>
class MPSCLockFreeQueue 
{
private:
    struct Node 
    {
        Node( T val ) : value(val), next(NULL) { }
        T value;
        Node* next;
    };
    Node * Head;               
    __declspec(align(4)) Node * InsertionPoint;  //__declspec(align(4)) forces 32bit alignment this must be changed for 64bit when appropriate.

public:
    MPSCLockFreeQueue() 
    {
        InsertionPoint = new Node( T() );
        Head = InsertionPoint;
    }
    ~MPSCLockFreeQueue() 
    {
        // release the list
        T result;
        while( Consume(result) ) 
        {   
            //The list should be cleaned up before the destructor is called as there is no way to know whether or not to delete the value.
            //So we just do our best.
        }
    }

    void Produce( const T& t ) 
    {
        Node * node = new Node(t);
        Node * oldInsertionPoint = (Node *) InterLockedxChange((volatile void **)&InsertionPoint,node);
        oldInsertionPoint->next = node;
    }

    bool Consume( T& result ) 
    {
        if (Head->next)
        {
            Node * oldHead = Head;
            Head = Head->next;
            delete oldHead;
            result = Head->value;
            return true;
        }       
        return false;               // else report empty
    }

};
于 2017-10-24T17:13:56.180 回答