1

我想从原子 uint32 拼凑一个 uint64 原子计数器。计数器有一个写入器和多个读取器。writer 是一个信号处理程序,因此它不能阻塞。

我的想法是使用低位的生成计数作为读锁。读取器重试,直到整个读取过程中生成计数稳定,并且低位未设置。

以下代码在设计和使用内存排序方面是否正确?有没有更好的办法?

using namespace std;
class counter {
    atomic<uint32_t> lo_{};
    atomic<uint32_t> hi_{};
    atomic<uint32_t> gen_{};

    uint64_t read() const {
        auto acquire = memory_order_acquire;
        uint32_t lo, hi, gen1, gen2;
        do {
            gen1 = gen_.load(acquire);
            lo = lo_.load(acquire);
            hi = hi_.load(acquire);
            gen2 = gen_.load(acquire);
        } while (gen1 != gen2 || (gen1 & 1));
        return (uint64_t(hi) << 32) | lo;
    }

    void increment() {
        auto release = memory_order_release;
        gen_.fetch_add(1, release);
        uint32_t newlo = 1 + lo_.fetch_add(1, release);
        if (newlo == 0) {
            hi_.fetch_add(1, release);
        }
        gen_.fetch_add(1, release);
    }
};

编辑:哎呀,已修复auto acquire = memory_order_release;

4

1 回答 1

7

这是一种已知模式,称为 SeqLock。 https://en.wikipedia.org/wiki/Seqlock。(简化为只有一个作家,因此不需要额外支持排除同时作家。)

您不需要或不希望计数器变量本身的增量使用原子 RMW 操作。您可以使用原子 32 位加载加载两半,将其递增,然后以原子方式存储结果。(使用廉价relaxedrelease记忆订单,并使用release商店进行第二次柜台更新)。

同样,计数器也不需要是原子 RMW。

作者只需要纯加载和纯存储,只有发布排序,这比原子 RMW 便宜(很多),或者使用 seq_cst 排序的存储

  • 以任意顺序加载计数器和值
  • 存储一个新计数器(旧+1)
  • 存储新值(或者如果你想无进位分支,则只更新下半部分)
  • 存储最终计数器。

这三个要点中商店的顺序是唯一重要的事情。在第一个存储之后的写栅栏可能会很好,因为我们真的不希望在比放松更昂贵的 CPU 上同时创建两个存储的成本。release


不幸的是,为了满足 C++ 规则,value必须是atomic<T>,这使得让编译器生成可能加载两半的最有效代码变得不方便。例如 ARM ldp/ stpload-pair 可能不是原子的,但这没关系。(而且编译器通常不会将两个单独的原子 32 位加载优化为一个更广泛的加载。)

序列计数器为奇数时其他线程读取的值无关紧要,但我们希望避免未定义的行为。也许我们可以使用 avolatile uint64_t和 an的并集atomic<uint64_t>


我为另一个我没有完成答案的问题编写了这个 C++SeqLock<class T>模板(找出哪些版本的 ARM 具有 64 位原子加载和存储)。

这会尝试检查目标是否已经支持无锁原子操作,atomic<T>以阻止您在无意义时使用它。(通过定义来禁用测试目的IGNORE_SIZECHECK。) TODO:透明地回退到这样做,可能使用模板专业化,而不是使用static_assert.

我提供了一个支持运算符的inc()函数。TODO 将是一个接受 lambda 以对 a 做某事的 a ,并在序列计数器更新之间存储结果。T++apply()T

// **UNTESTED**

#include <atomic>

#ifdef UNIPROCESSOR
// all readers and writers run on the same core
// ordering instructions at compile time is all that's necessary
#define ATOMIC_FENCE std::atomic_signal_fence
#else
// A reader can be running on another core while writing
// memory barriers or ARMv8 acquire / release loads / store are needed
#define ATOMIC_FENCE std::atomic_thread_fence
#endif
// using fences instead of .store(std::memory_order_release) will stop the compiler
// from taking advantage of a release-store instruction, like on AArch64 or x86


// SINGLE WRITER only.
// uses volatile + barriers for the data itself, like pre-C++11
template <class T>
class SeqLocked
{
#ifndef IGNORE_SIZECHECK
    // sizeof(T) > sizeof(unsigned)
    static_assert(!std::atomic<T>::is_always_lock_free, "A Seq Lock with a type small enough to be atomic on its own is totally pointless, and we don't have a specialization that replaces it with a straight wrapper for atomic<T>");
#endif

       // C++17 doesn't have a good way to express a load that doesn't care about tearing
       //  without explicitly writing it as multiple small parts and thus gimping the compiler if it can use larger loads
    volatile T data;          // volatile should be fine on any implementation where pre-C++11 lockless code was possible with volatile,
                              //  even though Data Race UB does apply to volatile variables in ISO C++11 and later.

    std::atomic<unsigned> seqcount{0};  // Even means valid, odd means modification in progress.
                                        //  unsigned wraps around at a power of 2 on overflow

public:
    T get() const {
        unsigned c0, c1;
        T tmp;

        do {
            c0 = seqcount.load(std::memory_order_relaxed);  // or this can be a std::memory_order_acquire for multicore so AArch64 can use LDAR
            ATOMIC_FENCE(std::memory_order_acquire);

            tmp = (T)data;       // load

            ATOMIC_FENCE(std::memory_order_acquire);  // LoadLoad barrier
            c1 = seqcount.load(std::memory_order_relaxed);
        } while(c0&1 || c0 != c1);     // retry if the counter changed or is odd

        return tmp;
    }

    // TODO: a version of this that takes a lambda for the operation on tmp
    T inc() {
        unsigned orig_count = seqcount.load(std::memory_order_relaxed);

        seqcount.store(orig_count+1, std::memory_order_relaxed);
        ATOMIC_FENCE(std::memory_order_release);
        // make sure the data stores appear after the first counter update.

        T tmp = data;  // load
        ++tmp;
        data = tmp;    // store

        ATOMIC_FENCE(std::memory_order_release);
        seqcount.store(orig_count+2, std::memory_order_relaxed);  // Or use mo_release here, better on AArch64

        return tmp;
    }

    void set(T newval) {
        unsigned orig_count = seqcount.load(std::memory_order_relaxed);

        seqcount.store(orig_count+1, std::memory_order_relaxed);
        ATOMIC_FENCE(std::memory_order_release);
        // make sure the data stores appear after the first counter update.

        data = newval;    // store

        ATOMIC_FENCE(std::memory_order_release);
        seqcount.store(orig_count+2, std::memory_order_relaxed);  // Or use mo_release here, better on AArch64
    }

};


/***** test callers *******/
#include <stdint.h>

struct sixteenbyte {
    //unsigned arr[4];
    unsigned long  a,b,c,d;
    sixteenbyte() = default;
    sixteenbyte(const volatile sixteenbyte &old)
         : a(old.a), b(old.b), c(old.c), d(old.d) {}
    //arr(old.arr) {}
};

void test_inc(SeqLocked<uint64_t> &obj) {  obj.inc(); }
sixteenbyte test_get(SeqLocked<sixteenbyte> &obj) { return obj.get(); }
//void test_set(SeqLocked<sixteenbyte> &obj, sixteenbyte val) { obj.set(val); }

uint64_t test_get(SeqLocked<uint64_t> &obj) {
    return obj.get();
}

// void atomic_inc_u64_seq_cst(std::atomic<uint64_t> &a) { ++a; }
uint64_t u64_inc_relaxed(std::atomic<uint64_t> &a) {
    // same but without dmb barriers
    return 1 + a.fetch_add(1, std::memory_order_relaxed);
}

uint64_t u64_load_relaxed(std::atomic<uint64_t> &a) {
    // gcc uses LDREXD, not just LDRD?
    return a.load(std::memory_order_relaxed);
}

void u64_store_relaxed(std::atomic<uint64_t> &a, uint64_t val) {
    // gcc uses a LL/SC retry loop even for a pure store?
    a.store(val, std::memory_order_relaxed);
}

它编译为我们在ARM 和其他 ISA的 Godbolt 编译器资源管理器上想要的 asm 。至少对于 int64_t; volatile由于繁琐的规则,较大的结构类型可能复制效率较低。

它使用非原子volatile T data的共享数据。这在技术上是数据争用未定义行为,但我们在实践中使用的所有编译器都可以很好地支持 C++11 之前的多线程volatile对象访问。在 C++11 之前,人们甚至依赖于某些大小的原子性。我们没有,我们检查计数器,并且仅在没有并发写入的情况下使用我们读取的值。(这就是 SeqLock 的全部意义所在。)

一个问题volatile T data是,在 ISO C++ 中,T foo = data除非您提供来自对象的复制构造函数,否则不会为结构对象编译volatile,例如

sixteenbyte(const volatile sixteenbyte &old)
         : a(old.a), b(old.b), c(old.c), d(old.d) {}

这对我们来说真的很烦人,因为我们并不关心如何读取内存的细节,只是多次读取没有优化为一次。

volatile在这里确实是错误的工具,并且T data带有足够的围栏以确保读取实际上发生在原子计数器的读取之间会更好。例如,我们可以在 GNU C 中使用asm("":::"memory");编译器屏障来防止在访问之前/之后重新排序。这将让编译器使用 SIMD 向量复制更大的对象,或者其他任何东西,它不会通过单独的volatile访问来完成。

我认为std::atomic_thread_fence(mo_acquire)这也是一个足够的障碍,但我不是 100% 确定。


在 ISO C 中,您可以复制一个volatile聚合(结构),编译器将发出通常复制那么多字节的任何 asm。但是在 C++ 中,我们显然不能拥有好东西。

于 2019-02-09T22:05:21.143 回答