5

我开发了一些无锁数据结构并出现以下问题。

我有编写器线程,它在堆上创建对象并将它们包装在带有引用计数器的智能指针中。我也有很多读者线程,可以处理这些对象。代码可能如下所示:

SmartPtr ptr;

class Reader : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr local(ptr);
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr newPtr(new Object);    
           ptr = newPtr;  
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;) // wait for crash :(
}

当我创建它的线程本地副本时ptr,至少意味着

  1. 读取地址。
  2. 增加引用计数器。

我不能原子地执行这两个操作,因此有时我的读者会使用已删除的对象。

问题是 - 我应该使用什么样的智能指针来实现从多个线程进行读写访问并进行正确的内存管理?应该存在解决方案,因为Java程序员甚至不关心这样的问题,只是依靠所有对象都是引用并且只有在没有人使用它们时才被删除。

对于 PowerPC,我发现http://drdobbs.com/184401888看起来不错,但使用了 x86 中没有的 Load-Linked 和 Store-Conditional 指令。

据我了解,提升指针仅使用锁提供此类功能。我需要无锁解决方案。

4

4 回答 4

9

boost::shared_ptr 有 atomic_store ,它使用“无锁”自旋锁,对于 99% 的可能情况应该足够快。

    boost::shared_ptr<Object> ptr;
class Reader : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> local(boost::atomic_load(&ptr));
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> newPtr(new Object);    
           boost::atomic_store(&ptr, newPtr);
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;)
}

编辑:

针对下面的评论,实现在“boost/shared_ptr.hpp”中......

template<class T> void atomic_store( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock_pool<2>::scoped_lock lock( p );
    p->swap( r );
}

template<class T> shared_ptr<T> atomic_exchange( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock & sp = boost::detail::spinlock_pool<2>::spinlock_for( p );

    sp.lock();
    p->swap( r );
    sp.unlock();

    return r; // return std::move( r )
}
于 2011-11-04T09:42:52.413 回答
4

使用一些 jiggery-pokery,您应该能够使用 InterlockedCompareExchange128 完成此操作。将引用计数和指针存储在 2 元素 __int64 数组中。如果引用计数在数组 [0] 中并且指针在数组 [1] 中,则原子更新将如下所示:

while(true)
{
    __int64 comparand[2];
    comparand[0] = refCount;
    comparand[1] = pointer;
    if(1 == InterlockedCompareExchange128(
        array,
        pointer,
        refCount + 1,
        comparand))
    {
        // Pointer is ready for use. Exit the while loop.
    }
}

如果 InterlockedCompareExchange128 内在函数不适用于您的编译器,那么您可以使用底层的 CMPXCHG16B 指令来代替,如果您不介意在汇编语言中乱搞的话。

于 2011-11-04T10:04:00.670 回答
2

RobH 提出的解决方案不起作用。它与原始问题有相同的问题:访问引用计数对象时,它可能已经被删除。

我认为在没有全局锁(如 boost::atomic_store 中)或条件读/写指令的情况下解决问题的唯一方法是以某种方式延迟对象的销毁(或共享引用计数对象,如果使用这样的东西)。所以 zennehoy 有一个好主意,但他的方法太不安全了。

我可能这样做的方法是在编写器线程中保留所有指针的副本,以便编写器可以控制对象的销毁:

class Writer : public Thread {
    virtual void Run() {
        list<SmartPtr> ptrs; //list that holds all the old ptr values        

        for (;;) {
            SmartPtr newPtr(new Object);
            if(ptr)
                ptrs.push_back(ptr); //push previous pointer into the list
            ptr = newPtr;

            //Periodically go through the list and destroy objects that are not
            //referenced by other threads
            for(auto it=ptrs.begin(); it!=ptrs.end(); )
                if(it->refCount()==1)
                    it = ptrs.erase(it);
                else
                    ++it;
       }
    }
};

但是,对于智能指针类仍有要求。这不适用于 shared_ptr,因为读取和写入不是原子的。它几乎适用于 boost::intrusive_ptr。intrusive_ptr 上的赋值是这样实现的(伪代码):

//create temporary from rhs
tmp.ptr = rhs.ptr;
if(tmp.ptr)
    intrusive_ptr_add_ref(tmp.ptr);

//swap(tmp,lhs)
T* x = lhs.ptr;
lhs.ptr = tmp.ptr;
tmp.ptr = x;

//destroy temporary
if(tmp.ptr)
    intrusive_ptr_release(tmp.ptr);

据我了解,这里唯一缺少的是之前的编译器级别的内存栅栏lhs.ptr = tmp.ptr;。加上这一点,在严格的条件下读取rhs和写入lhs都是线程安全的:1)x86 或 x64 架构 2)原子引用计数 3)rhs在赋值期间引用计数不得为零(由上面的 Writer 代码保证)4)仅一个线程写入lhs(使用 CAS 你可以有几个作家)。

无论如何,您可以根据 intrusive_ptr 创建自己的智能指针类,并进行必要的更改。绝对比重新实现 shared_ptr 容易。此外,如果您想要性能,侵入式是您的最佳选择。

于 2011-11-04T21:11:53.823 回答
-2

这在java中更容易工作的原因是垃圾收集。在 C++ 中,您必须手动确保当您想要删除某个值时,该值不只是开始被其他线程使用。

我在类似情况下使用的解决方案是简单地延迟删除值。我创建了一个单独的线程,它遍历要删除的内容列表。当我想删除某些东西时,我会将它添加到这个带有时间戳的列表中。删除线程在实际删除该值之前等到此时间戳之后的某个固定时间。您只需确保延迟足够大,以保证该值的任何临时使用都已完成。

就我而言,100 毫秒就足够了,为了安全起见,我选择了几秒钟。

于 2011-11-04T09:52:02.027 回答