39

我最近使用三重缓冲区的 std::atomic 移植到 C++11,用作并发同步机制。这种线程同步方法背后的想法是,对于生产者-消费者的情况,你有一个运行得更快的生产者消费者,三重缓冲可以带来一些好处,因为生产者线程不会因为不得不等待消费者而“减慢”速度。在我的例子中,我有一个以~120fps 更新的物理线程和一个以~60fps 运行的渲染线程。显然,我希望渲染线程总是尽可能获得最新的状态,但我也知道我会从物理线程中跳过很多帧,因为速率不同。另一方面,我希望我的物理线程保持其恒定的更新率,并且不受锁定数据的较慢渲染线程的限制。

原始的 C 代码是由 remis-thoughts 编写的,完整的解释在他的博客中。我鼓励任何有兴趣阅读它以进一步了解原始实现的人。

我的实现可以在这里找到。

基本思想是拥有一个具有 3 个位置(缓冲区)和一个原子标志的数组,该标志通过比较和交换来定义在任何给定时间哪些数组元素对应于什么状态。这样,只有一个原子变量用于对数组的所有 3 个索引和三重缓冲背后的逻辑进行建模。缓冲区的 3 个位置被命名为 Dirty、Clean 和 Snap。Producer总是写入 Dirty 索引,并且可以翻转 writer 以将 Dirty 与当前的 Clean 索引交换消费者可以请求一个新的Snap,它将当前的 Snap 索引与 Clean 索引交换以获得最新的缓冲区。消费者总是在Snap 位置读取缓冲区。

该标志由一个 8 位无符号整数组成,这些位对应于:

(未使用)(新写入)(2x 脏)(2x 干净)(2x 快照)

newWrite 额外位标志由写入器设置并由读取器清除。读者可以使用它来检查自上次快照以来是否有任何写入,如果没有,则不会再进行一次快照。可以使用简单的按位运算获得标志和索引。

现在好了,代码:

template <typename T>
class TripleBuffer
{

public:

  TripleBuffer<T>();
  TripleBuffer<T>(const T& init);

  // non-copyable behavior
  TripleBuffer<T>(const TripleBuffer<T>&) = delete;
  TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;

  T snap() const; // get the current snap to read
  void write(const T newT); // write a new value
  bool newSnap(); // swap to the latest value, if any
  void flipWriter(); // flip writer positions dirty / clean

  T readLast(); // wrapper to read the last available element (newSnap + snap)
  void update(T newT); // wrapper to update with a new element (write + flipWriter)

private:

  bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
  uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
  uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes

  // 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
  // newWrite   = (flags & 0x40)
  // dirtyIndex = (flags & 0x30) >> 4
  // cleanIndex = (flags & 0xC) >> 2
  // snapIndex  = (flags & 0x3)
  mutable atomic_uint_fast8_t flags;

  T buffer[3];
};

执行:

template <typename T>
TripleBuffer<T>::TripleBuffer(){

  T dummy = T();

  buffer[0] = dummy;
  buffer[1] = dummy;
  buffer[2] = dummy;

  flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){

  buffer[0] = init;
  buffer[1] = init;
  buffer[2] = init;

  flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
T TripleBuffer<T>::snap() const{

  return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}

template <typename T>
void TripleBuffer<T>::write(const T newT){

  buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}

template <typename T>
bool TripleBuffer<T>::newSnap(){

  uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
  do {
    if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
      return false;
  } while(!flags.compare_exchange_weak(flagsNow,
                                       swapSnapWithClean(flagsNow),
                                       memory_order_release,
                                       memory_order_consume));
  return true;
}

template <typename T>
void TripleBuffer<T>::flipWriter(){

  uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
  while(!flags.compare_exchange_weak(flagsNow,
                                     newWriteSwapCleanWithDirty(flagsNow),
                                     memory_order_release,
                                     memory_order_consume));
}

template <typename T>
T TripleBuffer<T>::readLast(){
    newSnap(); // get most recent value
    return snap(); // return it
}

template <typename T>
void TripleBuffer<T>::update(T newT){
    write(newT); // write new value
    flipWriter(); // change dirty/clean buffer positions for the next update
}

template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
    // check if the newWrite bit is 1
    return ((flags & 0x40) != 0);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
    // swap snap with clean
    return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
    // set newWrite bit to 1 and swap clean with dirty 
    return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}

如您所见,我决定使用Release-Consume模式进行内存排序。存储的释放(memory_order_release) 确保当前线程中的写入不能在存储之后重新排序。另一方面,Consume确保当前线程中依赖于当前加载的值的读取不会在此加载之前重新排序。这确保了对释放相同原子变量的其他线程中的因变量的写入在当前线程中是可见的。

如果我的理解是正确的,因为我只需要原子设置标志,编译器可以自由地重新排序不直接影响标志的其他变量的操作,从而进行更多优化。通过阅读一些关于新内存模型的文档,我也知道这些宽松的原子只会在 ARM 和 POWER 等平台上产生显着影响(它们主要是因为它们而引入的)。由于我的目标是 ARM,我相信我可以从这些操作中受益,并且能够挤出更多的性能。

现在的问题:

对于这个特定问题,我是否正确使用了 Release-Consume 宽松排序?

谢谢,

安德烈

PS:对不起,很长的帖子,但我认为需要一些体面的上下文才能更好地了解问题。

编辑: 实施@Yakk的建议:

  • 修复flags了 read onnewSnap()flipWriter()使用直接赋值的问题,因此使用 default load(std::memory_order_seq_cst)
  • 为清晰起见,将位摆弄操作移至专用功能。
  • 添加bool了返回类型newSnap(),现在在没有新内容时返回 false,否则返回 true。
  • 使用习惯用法将类定义为不可= delete复制,因为如果使用了复制构造函数和赋值构造函数,则它们都是不安全的TripleBuffer

编辑 2: 修复了不正确的描述(感谢@Useless)。是消费者请求一个新的 Snap 并从 Snap 索引中读取数据(而不是“作者”)。抱歉打扰了,感谢无用的指出。

编辑 3:根据@Display Name 的建议 优化了newSnap()and功能,有效地去除了每个循环周期的 2 个冗余 's。flipriter()load()

4

2 回答 2

3

为什么在 CAS 循环中加载旧标志值两次?第一次是flags.load(),第二次是compare_exchange_weak(),标准在 CAS 失败时指定的将前一个值加载到第一个参数中,在这种情况下是 flagsNow。

根据http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange,“否则,将存储在 *this 中的实际值加载到预期中(执行加载操作)。 ”所以你的循环正在做的是失败时,compare_exchange_weak()重新加载flagsNow,然后循环重复,第一条语句在加载后立即再次加载它compare_exchange_weak()。在我看来,您的循环应该将负载拉到循环之外。例如,newSnap()将是:

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do
{
    if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume));

flipWriter()

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume));
于 2014-04-23T00:10:04.583 回答
1

是的,这是memory_order_acquire和memory_order_consume之间的区别,但是当你每秒使用180个左右时你不会注意到它。如果你想知道数字的答案,你可以用 m2 = memory_order_consume 运行我的测试。只需将 producer_or_consumer_Thread 更改为:

TripleBuffer <int> tb;

void producer_or_consumer_Thread(void *arg)
{
    struct Arg * a = (struct Arg *) arg;
    bool succeeded = false;
    int i = 0, k, kold = -1, kcur;

    while (a->run)
    {
        while (a->wait) a->is_waiting = true; // busy wait
        if (a->producer)
        {
            i++;
            tb.update(i);
            a->counter[0]++;
        }
        else
        {
            kcur = tb.snap();
            if (kold != -1 && kcur != kold) a->counter[1]++;
            succeeded = tb0.newSnap();
            if (succeeded)
            {
                k = tb.readLast();
                if (kold == -1)
                    kold = k;
                else if (kold = k + 1)
                    kold = k;
                else
                    succeeded = false;
            }
            if (succeeded) a->counter[0]++;   
        }
    }
    a->is_waiting =  true;
}

测试结果:

_#_  __Produced __Consumed _____Total
  1    39258150   19509292   58767442
  2    24598892   14730385   39329277
  3    10615129   10016276   20631405
  4    10617349   10026637   20643986
  5    10600334    9976625   20576959
  6    10624009   10069984   20693993
  7    10609040   10016174   20625214
  8    25864915   15136263   41001178
  9    39847163   19809974   59657137
 10    29981232   16139823   46121055
 11    10555174    9870567   20425741
 12    25975381   15171559   41146940
 13    24311523   14490089   38801612
 14    10512252    9686540   20198792
 15    10520211    9693305   20213516
 16    10523458    9720930   20244388
 17    10576840    9917756   20494596
 18    11048180    9528808   20576988
 19    11500654    9530853   21031507
 20    11264789    9746040   21010829
于 2013-04-28T14:53:59.380 回答