很有趣的问题!比我最初想象的要复杂得多:-) 我喜欢无锁解决方案,所以我尝试在下面解决一个问题。
有很多方法可以考虑这个系统。您可以将其建模为固定大小的循环缓冲区/队列(有两个条目),但随后您将失去更新下一个可用值以供消费的能力,因为您不知道消费者是否已开始阅读最近已发布的值或仍在(可能)阅读前一个值。因此,除了标准环形缓冲区之外,还需要额外的状态才能达到更优的解决方案。
首先请注意,生产者始终可以在任何给定时间点安全地写入一个单元格;如果消费者正在读取一个单元格,则可以写入另一个单元格。让我们称可以安全地写入“活动”单元的单元(可以潜在读取的单元是任何不是
活动的单元)。仅当当前未从其他单元读取时,才能切换活动单元。
与始终可以写入的活动单元格不同,非活动单元格只有在包含值时才能被读取;一旦该值被消耗,它就消失了。(这意味着在激进的生产者的情况下避免了活锁;在某些时候,消费者将清空一个单元格并将停止接触这些单元格。一旦发生这种情况,生产者肯定可以发布一个值,而在此之前,如果消费者不在读取过程中,它只能发布一个值(更改活动单元格)。)
如果有一个值可以被消费,只有消费者可以改变这个事实(对于非活动单元,无论如何);后续生产可能会更改哪个单元格处于活动状态以及发布的值,但一个值将始终准备好读取,直到它被消耗。
一旦生产者完成对活动单元格的写入,它可以通过更改哪个单元格是活动单元格(交换索引)来“发布”这个值,前提是消费者不在读取另一个单元格的过程中。如果消费者正在读取另一个单元格,则交换不会发生,但在这种情况下,消费者可以在读取完值后进行交换,前提是生产者不在写入中间(如果是,生产者将在完成后交换)。事实上,一般来说,消费者在完成读取后总是可以交换(如果它是唯一一个访问系统的人),因为消费者的虚假交换是良性的:如果另一个单元格中有东西,那么交换将导致它被读取接下来,如果没有,
因此,我们需要一个共享变量来跟踪活动单元格是什么,并且我们还需要一种方法让生产者和消费者都指示他们是否处于操作的中间。我们可以将这三个状态存储到一个原子变量中,以便能够一次(原子地)影响它们。我们还需要一种方法让消费者首先检查非活动单元格中是否有任何东西,并让两个线程适当地修改该状态。我尝试了其他一些方法,但最后最简单的方法就是将此信息也包含在其他原子变量中。这使得事情更容易推理,因为系统中的所有状态变化都是原子的。
我想出了一个无等待的实现(无锁,所有操作在有限数量的指令中完成)。
代码时间!
#include <atomic>
#include <cstdint>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() : m_state(0) { }
~ProducerConsumerDoubleBuffer() { }
// Never returns nullptr
T* start_writing() {
// Increment active users; once we do this, no one
// can swap the active cell on us until we're done
auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
return &m_buf[state & 1];
}
void end_writing() {
// We want to swap the active cell, but only if we were the last
// ones concurrently accessing the data (otherwise the consumer
// will do it for us when *it's* done accessing the data)
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
if ((state & 0x6) == 0) {
// The consumer wasn't in the middle of a read, we should
// swap (unless the consumer has since started a read or
// already swapped or read a value and is about to swap).
// If we swap, we also want to clear the full flag on what
// will become the active cell, otherwise the consumer could
// eventually read two values out of order (it reads a new
// value, then swaps and reads the old value while the
// producer is idle).
m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
}
}
// Returns nullptr if there appears to be no more data to read yet
T* start_reading() {
m_readState = m_state.load(std::memory_order_relaxed);
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// Nothing to read here!
return nullptr;
}
// At this point, there is guaranteed to be something to
// read, because the full flag is never turned off by the
// producer thread once it's on; the only thing that could
// happen is that the active cell changes, but that can
// only happen after the producer wrote a value into it,
// in which case there's still a value to read, just in a
// different cell.
m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;
// Now that we've incremented the user count, nobody can swap until
// we decrement it
return &m_buf[(m_readState & 1) ^ 1];
}
void end_reading() {
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// There was nothing to read; shame to repeat this
// check, but if these functions are inlined it might
// not matter. Otherwise the API could be changed.
// Or just don't call this method if start_reading()
// returns nullptr -- then you could also get rid
// of m_readState.
return;
}
// Alright, at this point the active cell cannot change on
// us, but the active cell's flag could change and the user
// count could change. We want to release our user count
// and remove the flag on the value we read.
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
// Oi, we were the last ones accessing the data when we released our cell.
// That means we should swap, but only if the producer isn't in the middle
// of producing something, and hasn't already swapped, and hasn't already
// set the flag we just reset (which would mean they swapped an even number
// of times). Note that we don't bother swapping if there's nothing to read
// in the other cell.
m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
}
}
private:
T m_buf[2];
// The bottom (lowest) bit will be the active cell (the one for writing).
// The active cell can only be switched if there's at most one concurrent
// user. The next two bits of state will be the number of concurrent users.
// The fourth bit indicates if there's a value available for reading
// in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
std::atomic<std::uint32_t> m_state;
std::uint32_t m_readState;
};
请注意,语义是这样的,消费者永远不能两次读取给定值,并且它读取的值总是比它读取的最后一个值更新。它在内存使用方面也相当有效(两个缓冲区,就像您的原始解决方案一样)。我避免使用 CAS 循环,因为它们通常比竞争下的单个原子操作效率低。
如果您决定使用上面的代码,我建议您先为其编写一些全面的(线程)单元测试。和适当的基准。我确实测试了它,但只是勉强。如果您发现任何错误,请告诉我:-)
我的单元测试:
ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_writing();
if (item != nullptr) { // Always true
*item = i;
}
buf.end_writing();
}
});
std::thread consumer([&]() {
int prev = -1;
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_reading();
if (item != nullptr) {
assert(*item > prev);
prev = *item;
}
buf.end_reading();
}
});
producer.join();
consumer.join();
至于您最初的实现,我只是粗略地看了看(设计新东西更有趣,呵呵),但 david.pfx 的回答似乎解决了您问题的这一部分。