10

我为需要跨线程同步的非常简单的数据块编写了一个容器。我想要最好的表现。我不想使用锁。

我想使用“放松”的原子。部分是为了那一点额外的魅力,部分是为了真正理解它们。

我一直在做这方面的工作,而且我正处于这段代码通过了我对其进行的所有测试的地步。不过,这还不是“证据”,所以我想知道我是否缺少任何东西,或者我可以通过其他任何方式来测试它吗?

这是我的前提:

  • 唯一重要的是正确推送和弹出节点,并且堆栈永远不会失效。
  • 我相信内存中的操作顺序只在一个地方很重要:
    • 在 compare_exchange 操作本身之间。即使使用宽松的原子,也可以保证这一点。
  • “ABA”问题通过向指针添加标识号来解决。在 32 位系统上,这需要双字 compare_exchange,而在 64 位系统上,指针的未使用的 16 位用 id 号填充。
  • 因此:堆栈将始终处于有效状态。 (正确的?)

这就是我的想法。“通常”,我们对正在阅读的代码进行推理的方式是查看它的编写顺序。内存可以“乱序”读取或写入,但不能以使程序正确性无效的方式。

这在多线程环境中发生了变化。这就是内存栅栏的用途——这样我们仍然可以查看代码并能够推断它是如何工作的。

所以,如果这里一切都乱了套,那我用宽松的原子做什么呢?是不是有点太远了?

我不这么认为,但这就是我在这里寻求帮助的原因。

compare_exchange 操作本身保证了彼此的顺序不变性。

对原子进行读取或写入的唯一其他时间是在 compare_exchange 之前获取头部的初始值。它被设置为变量初始化的一部分。据我所知,此操作是否带回“正确”值是无关紧要的。

当前代码:

struct node
{
    node *n_;
#if PROCESSOR_BITS == 64
    inline constexpr node() : n_{ nullptr }                 { }
    inline constexpr node(node* n) : n_{ n }                { }
    inline void tag(const stack_tag_t t)                    { reinterpret_cast<stack_tag_t*>(this)[3] = t; }
    inline stack_tag_t read_tag()                           { return reinterpret_cast<stack_tag_t*>(this)[3]; }
    inline void clear_pointer()                             { tag(0); }
#elif PROCESSOR_BITS == 32
    stack_tag_t t_;
    inline constexpr node() : n_{ nullptr }, t_{ 0 }        { }
    inline constexpr node(node* n) : n_{ n }, t_{ 0 }       { }
    inline void tag(const stack_tag_t t)                    { t_ = t; }
    inline stack_tag_t read_tag()                           { return t_; }
    inline void clear_pointer()                             { }
#endif
    inline void set(node* n, const stack_tag_t t)           { n_ = n; tag(t); }
};

using std::memory_order_relaxed;
class stack
{
public:
    constexpr stack() : head_{}{}
    void push(node* n)
    {
        node next{n}, head{head_.load(memory_order_relaxed)};
        do
        {
            n->n_ = head.n_;
            next.tag(head.read_tag() + 1);
        } while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));
    }

    bool pop(node*& n)
    {
        node clean, next, head{head_.load(memory_order_relaxed)};
        do
        {
            clean.set(head.n_, 0);

            if (!clean.n_)
                return false;

            next.set(clean.n_->n_, head.read_tag() + 1);
        } while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));

        n = clean.n_;
        return true;
    }
protected:
    std::atomic<node> head_;
};

与其他问题相比,这个问题有什么不同?轻松的原子。他们对这个问题有很大的影响。

所以你怎么看?有什么我想念的吗?

4

3 回答 3

4

pushnode->_next已损坏,因为您在失败后不更新compareAndSwap。当下一次尝试成功时,您最初存储的节点可能node->setNext已被另一个线程从堆栈顶部弹出。compareAndSwap结果,一些线程认为它已经从堆栈中弹出了一个节点,但该线程已将其放回堆栈中它应该是:

void push(Node* node) noexcept
{
    Node* n = _head.next();
    do {
        node->setNext(n);
    } while (!_head.compareAndSwap(n, node));
}

此外,由于nextsetNextuse memory_order_relaxed,不能保证_head_.next()这里返回最近推送的节点。从堆栈顶部泄漏节点是可能的。同样的问题显然也存在pop_head.next()可能会返回一个先前但不再位于堆栈顶部的节点。如果返回值为nullptr,则当堆栈实际上不为空时,您可能无法弹出。

pop如果两个线程同时尝试从堆栈中弹出最后一个节点,也可能具有未定义的行为。他们都看到相同的值_head.next(),一个线程成功完成弹出。另一个线程进入 while 循环 - 因为观察到的节点指针不是nullptr- 但compareAndSwap循环很快将其更新为,nullptr因为堆栈现在是空的。在循环的下一次迭代中,该 nullptr 被取消以获取它的_next指针,随之而来的是很多欢闹。

pop显然也患有ABA。两个线程可以在栈顶看到同一个节点。假设一个线程到达评估_next指针的地步,然后阻塞。另一个线程成功弹出节点,推送 5 个新节点,然后在另一个线程唤醒之前再次推送该原始节点。其他线程compareAndSwap将成功 - 栈顶节点是相同的 - 但将旧_next值存储到_head而不是新值。其他线程推送的五个节点全部泄露。情况memory_order_seq_cst也是如此。

于 2014-07-18T21:25:16.490 回答
2

暂且不说实现pop操作的难度,我认为memory_order_relaxed是不够的。在推送节点之前,假设将向其中写入一些值,这些值将在节点弹出时被读取。您需要一些同步机制来确保值在被读取之前已经被实际写入。 memory_order_relaxed不提供同步... memory_order_acquire/memory_order_release会。

于 2014-07-20T13:53:43.867 回答
2

这段代码完全被破坏了。

这似乎可行的唯一原因是,当前的编译器对跨原子操作的重新排序不是很积极,而且 x86 处理器有很强的保证。

第一个问题是,如果没有同步,就无法保证这个数据结构的客户端甚至会看到要初始化的节点对象的字段。下一个问题是,如果没有同步,推送操作可以读取头部标签的任意旧值。

我们开发了一个工具 CDSChecker,它可以模拟内存模型允许的大多数行为。它是开源和免费的。在您的数据结构上运行它以查看一些有趣的执行。

在这一点上,证明任何关于使用宽松原子的代码都是一个巨大的挑战。大多数证明方法都失败了,因为它们通常本质上是归纳的,并且您没有要归纳的命令。因此,您可以凭空解决阅读问题...

于 2015-01-29T09:59:36.960 回答