3

我真的不明白如何使某些数据结构无锁。例如,如果您有一个链表,那么您可以使用互斥锁围绕操作,或者如果在您忙于将新节点重新链接在一起时执行另一个线程,您可能会遇到竞争条件。

“无锁”的概念(我理解它并不意味着“无锁”,而是意味着线程可以在不等待其他线程完成的情况下继续前进)只是没有意义。

有人可以向我展示一个使用堆栈、队列或链表等实现为“无锁”的简单示例,因为我无法理解如何在不干扰其他线程生产力的情况下防止竞争条件?这两个目标肯定相互矛盾吗?

4

7 回答 7

4

无锁数据结构使用原子操作并且可能会施加额外的要求。例如,数据结构可能只对一个读取器和一个写入器线程或任何其他组合是安全的。在简单链表的情况下,将对节点指针使用原子读写,以保证多个线程可以安全地同时对其进行读写。

你可能会也可能不会逃脱惩罚。如果您需要有关数据结构和验证内容的额外保证,那么如果没有某种形式的高级锁定,您可能无法做到这一点。此外,即使考虑到如何使用数据结构的其他要求,也不是每个数据结构都允许重写为无锁。在这种情况下,不可变对象可能是一种解决方案,但它们通常会因复制而带来性能损失,这并不总是比锁定对象然后改变它更可取。

于 2014-03-01T20:56:16.223 回答
2

有不同的原语可以用来构造这样的无锁数据结构。例如,原子执行以下代码的比较和交换(简称 CAS):

CAS(x, o, n)
  if x == o:
    x = n
    return o
  else:
    return x

通过此操作,您可以进行原子更新。例如,考虑一个非常简单的链表,它按排序顺序存储元素,允许您插入新元素并检查元素是否已经存在。查找操作将像以前一样工作:它将遍历所有链接,直到找到一个元素,或者找到比查询更大的元素。插入需要更加小心。它可以按如下方式工作:

insert(lst, x)
  xn = new-node(x)
  n = lst.head
  while True:
    n = find-before(n, x)
    xn.next = next = n.next
    if CAS(n.next, next, x) == next:
      break

find-before(n,x)只是找到顺序中前面的元素x。当然,这只是一个草图。一旦您想要支持删除,事情就会变得更加复杂。我推荐 Herlihy 和 Shavit 的“多处理器编程艺术”。我还应该指出,切换实现相同模型的数据结构以使它们无锁通常是有利的。例如,如果你想实现 的等价物std::map,用红黑树来做会很痛苦,但跳过列表更易于管理。

于 2014-03-01T21:53:12.503 回答
2

我发现简单且可解释的是,首先您可以为基于锁(互斥)样式的数据结构编写伪代码,然后尝试查看您持有锁的变量如何通过 CAS 操作以无锁方式修改。虽然其他人已经给出了很好的答案,我想补充一点,只有当你自己实现时,你才能感受到它,当然,通过阅读它发表的一些研究论文中的一些伪代码。

这是我在 C++ 中实现的一个队列,用于多线程运行的验证测试:

#include<iostream>
#include<atomic>
#include<thread>
#include<vector>
#define N 1000
using namespace std;
class lf_queue
{
private:
    struct node
    {   int data;
        atomic<node*> next;
        node(int d):data(d)
        {}
    };
    atomic<node*> Head;
    atomic<node*> Tail;
public:
    lf_queue()
    {
        node *nnode= new node(-1);
        nnode->next=NULL;
        Head=nnode;
        Tail=nnode;
    }
    void enqueue(int data)
    {
        node *nnode= new node(data);
        nnode->next=NULL;
        node *tail,*next_p;
        while(true)
        {
            tail=Tail.load();
            next_p=tail->next;
            if(tail==Tail.load())
            {
                if(next_p==NULL)
                {
                    if((tail->next).compare_exchange_weak(next_p,nnode))
                    break;
                }
                else
                {
                    Tail.compare_exchange_weak(tail,next_p);
                }
            }
        }
        Tail.compare_exchange_weak(tail,nnode);
    }
    bool dequeue(int &res)
    {
        while(true)
        {
            node *head,*tail,*next_p;
            head=Head.load();
            tail=Tail.load();
            next_p=head->next;
            if(head==Head.load())
            {
                if(head==tail)
                {
                    if(next_p==NULL)
                        return false;
                    Tail.compare_exchange_weak(tail,next_p);
                }
                else
                {
                    res=next_p->data;
                    if(Head.compare_exchange_weak(head,next_p))
                        break;
                }
            }
        }//end loop
        return true;
    }
};
void producer(lf_queue &q)
{   //cout<<this_thread::get_id()<<"Inside producer\n";
    for(int i=0;i<N;i++)
    {
       q.enqueue(1);
     }
    //cout<<this_thread::get_id()<<" "<<"Finished producing\n";
}
void consumer(lf_queue &q,atomic<int>& sum)
{   //cout<<this_thread::get_id()<<" "<<"Inside consumer\n";
    for(int i=0;i<N;i++)
    {
        int res=0;
        while(!q.dequeue(res));
        sum+=res;
    }
    //cout<<this_thread::get_id()<<" "<<"Finished consuming\n";
}
int main()
{
    lf_queue Q;
    atomic<int> sum;
    sum.store(0);
    vector<thread> thread_pool;
    for(int i=0;i<10;i++)
    {   if(i%2==0)
        {   thread t(consumer,ref(Q),ref(sum));
            thread_pool.push_back(move(t));
        }
        else
        {
            thread t(producer,ref(Q));
            thread_pool.push_back(move(t));    
        }
    }
    for(int i=0;i<thread_pool.size();i++)
    thread_pool[i].join();
    cout<<"Final sum "<<sum.load()<<"\n";
    return 0;
}

我尝试使用 Harris 的论文实现无锁链表,但遇到了麻烦,你看 C++11 风格,你只能对 atomic<> 类型执行 CAS,而且这些 atomic<node*> 不能被强制转换为long 是为了进行位窃取,Harris 的实现使用它来逻辑地标记已删除的节点。但是,互联网上有可用的 C 代码实现使用低级 cas_ptr 操作,这为在地址和 long 之间进行转换提供了更大的灵活性。

于 2021-10-11T04:00:32.773 回答
1

无锁结构使用原子指令来获取资源的所有权。原子指令锁定它在 CPU 缓存级别工作的变量,向您保证其他内核不会干扰操作。

假设你有这些原子指令:

  • 读取(A)-> A
  • compare_and_swap(A, B, C) -> oldA = A; 如果 (A == B) { A = C }; 返回旧A;

使用这些指令,您可以简单地创建一个堆栈:

template<typename T, size_t SIZE>
struct LocklessStack
{
public:
  LocklessStack() : top(0)
  {
  }
  void push(const T& a)
  {
     int slot;
     do
     {
       do
       {
         slot = read(top);
         if (slot == SIZE)
         {
           throw StackOverflow();
         }
       }while(compare_and_swap(top, slot, slot+1) == slot);
       // NOTE: If this thread stop here. Another thread pop and push
       //       a value, this thread will overwrite that value [ABA Problem].
       //       This solution is for illustrative porpoise only
       data[slot] = a;
     }while( compare_and_swap(top, slot, slot+1) == slot );
  }
  T pop()
  {
     int slot;
     T temp;
     do
     {
       slot = read(top);
       if (slot == 0)
       {
         throw StackUnderflow();
       }
       temp = data[slot-1];
     }while(compare_and_swap(top, slot, slot-1) == slot);
     return temp;
  }
private:
  volatile int top;
  T data[SIZE];
};

volatile 是必需的,因此编译器在优化期间不会弄乱操作顺序。发生两个并发推送:

第一个进入while循环并读取槽,然后第二个推送到达,读取顶部,比较和交换(CAS)成功并递增顶部。另一个线程唤醒,CAS失败并读取另一个时间顶部..

发生两个并发弹出:

真的和上一个案例类似。还需要读取值。

一个 pop 和 push 同时发生:

pop 读取顶部,读取温度.. push 输入并修改顶部并推送新值。Pop CAS 失败,pop() 将再次循环并读取新值

或者

push 读取顶部并获取一个插槽,弹出输入并修改顶部值。推动 CAS 失败,必须再次循环推动较低的指数。

显然这在并发环境中是不正确的

stack.push(A);
B = stack.pop();
assert(A == B); // may fail

因为虽然 push 是原子的,pop 是原子的,但它们的组合不是原子的。

游戏编程 gem 6的第一章是一个很好的参考。

请注意,代码未经测试,原子可能真的很讨厌

于 2014-03-01T21:45:50.897 回答
0

假设一个简单的操作,将变量加一。如果您使用“将变量从内存读取到 cpu,将 1 加到 cpu 寄存器,将变量写回”来实现这一点,那么您必须在整个事物周围放置某种互斥锁,因为您要确保第二个线程第一个写回变量之前不会读取变量。

如果您的处理器具有原子“增量内存位置”汇编指令,则不需要锁。

或者,假设您想将一个元素插入到链表中,这意味着您需要使起始指针指向新元素,然后使新元素指向之前的第一个元素。使用原子“交换两个内存单元”操作,您可以将当前开始指针写入新元素的“下一个”指针,然后交换两个指针 - 现在,取决于哪个线程首先运行,元素将不同列表中的顺序,但列表数据结构保持不变。

基本上,它总是关于“一次做几件事,在一个原子操作中,所以你不能将操作分解成可能不会被中断的单个部分”。

于 2014-03-01T20:59:35.073 回答
0

给你 - 一个非常基本的(push_front)无锁列表:

template <class T>
class LockFreeList {    
public:
    struct Node {
        T value_;
        Node* next_;
        Node(const T& value) : value_(value), next_(nullptr) {
        }
    };
    
    void push_front(const T& value) {
        Node* new_node = new Node(value);
        Node* old_head = head_;
        do { new_node->next_ = old_head; }
        while (!head_.compare_exchange_weak(old_head, new_node));
    }
    
private:
    std::atomic<Node*> head_;
}; 

受 Fedor Pikus 的 CppCon 2017 演讲的启发,请参阅: https ://youtu.be/ZQFzMfHIxng?t=2432

稍作改动:push_back 使用 compare_exchange_weak。

于 2022-01-18T14:07:44.793 回答
0

您对无锁的定义是错误的。

无锁允许单个线程饿死,但保证系统范围的吞吐量。一个算法是无锁的,如果它满足当程序线程运行足够长的时间时,至少有一个线程取得进展(对于进展的一些合理定义) https://en.wikipedia.org/wiki/Non-blocking_algorithm

这意味着,如果有多个线程访问数据结构,则只会授予 1 个;其余的将失败

关于无锁的重要一点是内存冲突的概率。使用锁保护的数据结构通常比使用原子变量的实现要快,但它不能很好地扩展,碰撞的可能性很小。

示例:多个线程不断 push_back 列表中的数据。这将导致许多冲突,并且经典互斥体很好。但是,如果您有 1 个线程将数据推送到列表末尾,并且有 1 个线程将数据弹出在前面,情况就不同了。如果列表不为空,则 push_back() 和 pop_front() 不会发生冲突(取决于实现),因为它们不在同一个对象上工作。但是仍然有一个空列表的变化,所以你仍然需要保护访问。在这种情况下,无锁将是更好的解决方案,因为您可以同时调用这两个函数而无需等待。

简而言之:无锁是为大型数据结构设计的,其中多个写入器大多是分开的,很少发生冲突。

不久前我尝试自己实现一个无锁列表容器...... https://codereview.stackexchange.com/questions/123201/lock-free-list-in-c

于 2016-10-18T07:10:35.290 回答