下面是一个多读/写共享数据的尝试,它使用 std::atomics 和忙等待而不是互斥锁和条件变量来在读写器之间同步。我很困惑为什么那里的断言被击中。我确定逻辑中的某个地方存在错误,但我不确定它在哪里。
实现背后的想法是,读取的线程一直在旋转,直到写入器完成写入。当他们进入读取函数时,他们会增加 m_numReaders 计数,而当他们等待写入者时,他们会增加 m_numWaiting 计数。
这个想法是,如果 m_numWaiting 总是在 m_numReaders 之后递增并在 m_numReaders 之前递减,则 m_numWaiting 应该总是小于或等于 m_numReaders。
不应该出现 m_numWaiting 大于 m_numReaders (或者我没有看到)的情况,因为读取器总是首先增加读取器计数器,并且有时只会增加等待计数器,而等待计数器总是首先减少。
然而,这似乎是正在发生的事情,因为断言正在被击中。如果您看到了,有人可以指出逻辑错误吗?
谢谢!
#include <iostream>
#include <thread>
#include <assert.h>
template<typename T>
class ReadWrite
{
public:
ReadWrite() : m_numReaders(0), m_numWaiting(0), m_writing(false)
{
m_writeFlag.clear();
}
template<typename functor>
void read(functor& readFunc)
{
m_numReaders++;
std::atomic<bool>waiting(false);
while (m_writing)
{
if(!waiting)
{
m_numWaiting++; // m_numWaiting should always be increased after m_numReaders
waiting = true;
}
}
assert(m_numWaiting <= m_numReaders);
readFunc(&m_data);
assert(m_numWaiting <= m_numReaders); // <-- These asserts get hit ?
if(waiting)
{
m_numWaiting--; // m_numWaiting should always be decreased before m_numReaders
}
m_numReaders--;
assert(m_numWaiting <= m_numReaders); // <-- These asserts get hit ?
}
//
// Only a single writer can operate on this at any given time.
//
template<typename functor>
void write(functor& writeFunc)
{
while (m_writeFlag.test_and_set());
// Ensure no readers present
while (m_numReaders);
// At this point m_numReaders may have been increased !
m_writing = true;
// If a reader entered before the writing flag was set, wait for
// it to finish
while (m_numReaders > m_numWaiting);
writeFunc(&m_data);
m_writeFlag.clear();
m_writing = false;
}
private:
T m_data;
std::atomic<int64_t> m_numReaders;
std::atomic<int64_t> m_numWaiting;
std::atomic<bool> m_writing;
std::atomic_flag m_writeFlag;
};
int main(int argc, const char * argv[])
{
const size_t numReaders = 2;
const size_t numWriters = 1;
const size_t numReadWrites = 10000000;
std::thread readThreads[numReaders];
std::thread writeThreads[numWriters];
ReadWrite<int> dummyData;
auto writeFunc = [&](int* pData) { return; }; // intentionally empty
auto readFunc = [&](int* pData) { return; }; // intentionally empty
auto readThreadProc = [&]()
{
size_t numReads = numReadWrites;
while (numReads--)
{
dummyData.read(readFunc);
}
};
auto writeThreadProc = [&]()
{
size_t numWrites = numReadWrites;
while (numWrites--)
{
dummyData.write(writeFunc);
}
};
for (std::thread& thread : writeThreads) { thread = std::thread(writeThreadProc);}
for (std::thread& thread : readThreads) { thread = std::thread(readThreadProc);}
for (std::thread& thread : writeThreads) { thread.join();}
for (std::thread& thread : readThreads) { thread.join();}
}