4

我正在尝试使用 C++实现基于惰性并发列表的集合shared_ptr。我的理由是unreachable nodes最后一个会自动释放shared_ptr。根据我的理解,对 a 的递增和递减操作shared_ptr's reference count是原子的。这意味着只有参考该节点的最后一个 shared_ptr应该为该节点调用delete/free。我为多个线程double free called运行程序,但我的程序因错误或只是Segmentation Fault(SIGSEGV)而崩溃。我不明白这怎么可能。下面给出的是我的实现代码,方法名称表示它们的预期操作。

#include<thread>
#include<iostream>
#include<mutex>
#include<climits>

using namespace std;

class Thread
{
   public:
      std::thread t;  
};
int n=50,ki=100,kd=100,kc=100;`/*no of threads, no of inserts,deletes & searches*/`


class Node
{
public:
      int key;
      shared_ptr<Node> next;
      bool marked;
      std::mutex nodeLock;

      Node() {
         key=0;
         next = nullptr;
         marked = false;
      }

      Node(int k) {
         key = k;
         next = nullptr;
         marked = false;
      }

      void lock() {
         nodeLock.lock();
      }

      void unlock() {
         nodeLock.unlock();
      }

      ~Node()
      {
      }
};

class List {
   shared_ptr<Node> head;
   shared_ptr<Node> tail;

public:

   bool validate(shared_ptr<Node> pred, shared_ptr<Node> curr) {
      return !(pred->marked) && !(curr->marked) && ((pred->next) == curr);
   }

   List() {
      head=make_shared<Node>(INT_MIN);
      tail=make_shared<Node>(INT_MAX);
      head->next=tail;
   }

   bool add(int key)
   {
      while(true)
      {
         /*shared_ptr<Node> pred = head;
         shared_ptr<Node> curr = pred->next;*/
        auto pred = head;
        auto curr = pred->next;

         while (key>(curr->key))
         {
            pred = curr;
            curr = curr->next;
         }

         pred->lock();
         curr->lock();

         if (validate(pred,curr))
         {
            if (curr->key == key)
            {
               curr->unlock();
               pred->unlock();
               return false;
            }
            else
            {
                shared_ptr<Node> newNode(new Node(key));
               //auto newNode = make_shared<Node>(key);
                //shared_ptr<Node> newNode = make_shared<Node>(key);
                newNode->next = curr;
                pred->next = newNode;
                curr->unlock();
                pred->unlock();
                return true;
            }
         }
         curr->unlock();
         pred->unlock();
      }
   }

   bool remove(int key)
   {
      while(true)
      {
         /*shared_ptr<Node> pred = head;
         shared_ptr<Node> curr = pred->next;*/

        auto pred = head;
        auto curr = pred->next;

         while (key>(curr->key))
         {
            pred = curr;
            curr = curr->next;
         }

         pred->lock();
         curr->lock();

         if (validate(pred,curr))
         {
            if (curr->key != key)
            {
               curr->unlock();
               pred->unlock();
               return false;
            }
            else
            {
               curr->marked = true;
               pred->next = curr->next;
               curr->unlock();
               pred->unlock();
               return true;
            }
         }
         curr->unlock();
         pred->unlock();
      }
   }

   bool contains(int key) {
      //shared_ptr<Node> curr = head->next;
    auto curr = head->next;

      while (key>(curr->key)) {
         curr = curr->next;
      }
      return curr->key == key && !curr->marked;
   }
}list;

void test(int curr)
{
   bool test;
    int time;

    int val, choice;
    int total,k=0;
    total=ki+kd+kc;

    int i=0,d=0,c=0;

    while(k<total)
    {
        choice = (rand()%3)+1;

        if(choice==1)
        {
            if(i<ki)
            {
                val = (rand()%99)+1;
                test = list.add(val);
                i++;
                k++;
            }
        }
        else if(choice==2)
        {
            if(d<kd)
            {
                val = (rand()%99)+1;
                test = list.remove(val);
                d++;
                k++;
            }
        }
        else if(choice==3)
        {
            if(c<kc)
            {
                val = (rand()%99)+1;
                test = list.contains(val);
                c++;
                k++;
            }
        }
    }
}

int main()
{
   int i;

   vector<Thread>thr(n);

   for(i=0;i<n;i++)
   {
      thr[i].t = thread(test,i+1);
   }
   for(i=0;i<n;i++)
   {
      thr[i].t.join();
   }
   return 0;
}

我无法弄清楚上面的代码有什么问题。错误每次都不同,其中一些只是SEGFAULTS

pure virtual method called
terminate called without an active exception
Aborted (core dumped)

您能否指出我在上面的代码中做错了什么?以及如何解决该错误?
编辑:添加了一个非常粗略test function的随机调用三个list methods. 此外,线程数和每个操作的数量是全局声明的。粗略的编程,但它重新创建了SEGFAULT

4

1 回答 1

5

问题是您没有为您shared_ptr的 s 使用原子存储和加载操作。

确实,a 的控制块中的引用计数(每个shared_ptr参与特定共享对象的所有权的人都有一个指向的指针)shared_ptr是原子的,但是,它本身的数据成员shared_ptr不是。

因此,让多个线程各自拥有一个共享对象是安全的,但是只要其中至少一个使用非常量成员函数,shared_ptr让多个线程访问相同的线程并不保存,这就是你的shared_ptr'在重新分配next指针时正在做。

说明问题

让我们看一下 libstdc++shared_ptr实现的(简化和美化的)复制构造函数:

shared_ptr(const shared_ptr& rhs)
 : m_ptr(rhs.m_ptr),
   m_refcount(rhs.m_refcount) 
{ }

这里m_ptr只是一个指向共享对象的原始指针,m_refcount是一个执行引用计数并处理最终删除对象m_ptr指向的类。

只是一个可能出错的例子:假设当前一个线程正试图找出一个特定的键是否包含在列表中。它从 中的复制初始化auto curr = head->next开始List::contains。就在它设法初始化curr.m_ptr操作系统调度程序之后,决定该线程必须暂停并在另一个线程中调度。

另一个线程正在删除head. 由于head->next仍然的引用计数为 1(毕竟,head->next线程 1 的引用计数尚未修改),当第二个线程完成删除节点时,它正在被删除。

然后一段时间后,第一个线程继续。它完成了 的初始化curr,但由于m_ptr在线程 2 开始删除之前已经初始化,所以它仍然指向现在删除的节点。尝试比较key > curr->key线程 1 时将访问无效内存。

使用 std::atomic_load 和 std::atomic_store 来防止问题

std::atomic_load并通过在调用指针传入std::atomic_store的复制构造函数/复制分配操作符之前锁定互斥锁来防止问题发生。shared_ptr如果跨多个线程共享的所有对s的读取和写入shared_ptr都是通过std::atomic_load/ std::atomic_storeresp。m_ptr在另一个线程开始读取或修改同一个线程时,永远不会发生一个线程只修改了引用计数但没有修改引用计数的情况shared_ptr

通过必要的原子加载和存储,List成员函数应如下所示:

bool validate(Node const& pred, Node const& curr) {
   return !(pred.marked) && !(curr.marked) && 
          (std::atomic_load(&pred.next).get() == &curr);
}

bool add(int key) {
    while (true) {
        auto pred = std::atomic_load(&head);
        auto curr = std::atomic_load(&pred->next);

        while (key > (curr->key)) {
            pred = std::move(curr);
            curr = std::atomic_load(&pred->next);
        }

        std::scoped_lock lock{pred->nodeLock, curr->nodeLock};
        if (validate(*pred, *curr)) {
            if (curr->key == key) {
                return false;
            } else {
                auto new_node = std::make_shared<Node>(key);

                new_node->next = std::move(curr);
                std::atomic_store(&pred->next, std::move(new_node));
                return true;
            }
        }
    }
}

bool remove(int key) {
    while (true) {
        auto pred = std::atomic_load(&head);
        auto curr = std::atomic_load(&pred->next);

        while (key > (curr->key)) {
            pred = std::move(curr);
            curr = std::atomic_load(&pred->next);
        }

        std::scoped_lock lock{pred->nodeLock, curr->nodeLock};
        if (validate(*pred, *curr)) {
            if (curr->key != key) {
                return false;
            } else {
                curr->marked = true;
                std::atomic_store(&pred->next, std::atomic_load(&curr->next));
                return true;
            }
        }
    }
}

bool contains(int key) {
    auto curr = std::atomic_load(&head->next);

    while (key > (curr->key)) {
        curr = std::atomic_load(&curr->next);
    }
    return curr->key == key && !curr->marked;
}

此外,您还应该制作Node::marked一个std::atomic_bool.

于 2018-02-11T01:01:43.490 回答