我最近使用三重缓冲区的 std::atomic 移植到 C++11,用作并发同步机制。这种线程同步方法背后的想法是,对于生产者-消费者的情况,你有一个运行得更快的生产者消费者,三重缓冲可以带来一些好处,因为生产者线程不会因为不得不等待消费者而“减慢”速度。在我的例子中,我有一个以~120fps 更新的物理线程和一个以~60fps 运行的渲染线程。显然,我希望渲染线程总是尽可能获得最新的状态,但我也知道我会从物理线程中跳过很多帧,因为速率不同。另一方面,我希望我的物理线程保持其恒定的更新率,并且不受锁定数据的较慢渲染线程的限制。
原始的 C 代码是由 remis-thoughts 编写的,完整的解释在他的博客中。我鼓励任何有兴趣阅读它以进一步了解原始实现的人。
我的实现可以在这里找到。
基本思想是拥有一个具有 3 个位置(缓冲区)和一个原子标志的数组,该标志通过比较和交换来定义在任何给定时间哪些数组元素对应于什么状态。这样,只有一个原子变量用于对数组的所有 3 个索引和三重缓冲背后的逻辑进行建模。缓冲区的 3 个位置被命名为 Dirty、Clean 和 Snap。Producer总是写入 Dirty 索引,并且可以翻转 writer 以将 Dirty 与当前的 Clean 索引交换。消费者可以请求一个新的Snap,它将当前的 Snap 索引与 Clean 索引交换以获得最新的缓冲区。消费者总是在Snap 位置读取缓冲区。
该标志由一个 8 位无符号整数组成,这些位对应于:
(未使用)(新写入)(2x 脏)(2x 干净)(2x 快照)
newWrite 额外位标志由写入器设置并由读取器清除。读者可以使用它来检查自上次快照以来是否有任何写入,如果没有,则不会再进行一次快照。可以使用简单的按位运算获得标志和索引。
现在好了,代码:
template <typename T>
class TripleBuffer
{
public:
TripleBuffer<T>();
TripleBuffer<T>(const T& init);
// non-copyable behavior
TripleBuffer<T>(const TripleBuffer<T>&) = delete;
TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;
T snap() const; // get the current snap to read
void write(const T newT); // write a new value
bool newSnap(); // swap to the latest value, if any
void flipWriter(); // flip writer positions dirty / clean
T readLast(); // wrapper to read the last available element (newSnap + snap)
void update(T newT); // wrapper to update with a new element (write + flipWriter)
private:
bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes
// 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
// newWrite = (flags & 0x40)
// dirtyIndex = (flags & 0x30) >> 4
// cleanIndex = (flags & 0xC) >> 2
// snapIndex = (flags & 0x3)
mutable atomic_uint_fast8_t flags;
T buffer[3];
};
执行:
template <typename T>
TripleBuffer<T>::TripleBuffer(){
T dummy = T();
buffer[0] = dummy;
buffer[1] = dummy;
buffer[2] = dummy;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){
buffer[0] = init;
buffer[1] = init;
buffer[2] = init;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
T TripleBuffer<T>::snap() const{
return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}
template <typename T>
void TripleBuffer<T>::write(const T newT){
buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}
template <typename T>
bool TripleBuffer<T>::newSnap(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do {
if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
return false;
} while(!flags.compare_exchange_weak(flagsNow,
swapSnapWithClean(flagsNow),
memory_order_release,
memory_order_consume));
return true;
}
template <typename T>
void TripleBuffer<T>::flipWriter(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow,
newWriteSwapCleanWithDirty(flagsNow),
memory_order_release,
memory_order_consume));
}
template <typename T>
T TripleBuffer<T>::readLast(){
newSnap(); // get most recent value
return snap(); // return it
}
template <typename T>
void TripleBuffer<T>::update(T newT){
write(newT); // write new value
flipWriter(); // change dirty/clean buffer positions for the next update
}
template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
// check if the newWrite bit is 1
return ((flags & 0x40) != 0);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
// swap snap with clean
return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
// set newWrite bit to 1 and swap clean with dirty
return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}
如您所见,我决定使用Release-Consume模式进行内存排序。存储的释放(memory_order_release) 确保当前线程中的写入不能在存储之后重新排序。另一方面,Consume确保当前线程中依赖于当前加载的值的读取不会在此加载之前重新排序。这确保了对释放相同原子变量的其他线程中的因变量的写入在当前线程中是可见的。
如果我的理解是正确的,因为我只需要原子设置标志,编译器可以自由地重新排序不直接影响标志的其他变量的操作,从而进行更多优化。通过阅读一些关于新内存模型的文档,我也知道这些宽松的原子只会在 ARM 和 POWER 等平台上产生显着影响(它们主要是因为它们而引入的)。由于我的目标是 ARM,我相信我可以从这些操作中受益,并且能够挤出更多的性能。
现在的问题:
对于这个特定问题,我是否正确使用了 Release-Consume 宽松排序?
谢谢,
安德烈
PS:对不起,很长的帖子,但我认为需要一些体面的上下文才能更好地了解问题。
编辑: 实施@Yakk的建议:
- 修复
flags
了 read onnewSnap()
和flipWriter()
使用直接赋值的问题,因此使用 defaultload(std::memory_order_seq_cst)
。 - 为清晰起见,将位摆弄操作移至专用功能。
- 添加
bool
了返回类型newSnap()
,现在在没有新内容时返回 false,否则返回 true。 - 使用习惯用法将类定义为不可
= delete
复制,因为如果使用了复制构造函数和赋值构造函数,则它们都是不安全的TripleBuffer
。
编辑 2: 修复了不正确的描述(感谢@Useless)。是消费者请求一个新的 Snap 并从 Snap 索引中读取数据(而不是“作者”)。抱歉打扰了,感谢无用的指出。
编辑 3:根据@Display Name 的建议
优化了newSnap()
and功能,有效地去除了每个循环周期的 2 个冗余 's。flipriter()
load()