1

下面是一个多读/写共享数据的尝试,它使用 std::atomics 和忙等待而不是互斥锁和条件变量来在读写器之间同步。我很困惑为什么那里的断言被击中。我确定逻辑中的某个地方存在错误,但我不确定它在哪里。

实现背后的想法是,读取的线程一直在旋转,直到写入器完成写入。当他们进入读取函数时,他们会增加 m_numReaders 计数,而当他们等待写入者时,他们会增加 m_numWaiting 计数。

这个想法是,如果 m_numWaiting 总是在 m_numReaders 之后递增并在 m_numReaders 之前递减,则 m_numWaiting 应该总是小于或等于 m_numReaders。

不应该出现 m_numWaiting 大于 m_numReaders (或者我没有看到)的情况,因为读取器总是首先增加读取器计数器,并且有时只会增加等待计数器,而等待计数器总是首先减少。

然而,这似乎是正在发生的事情,因为断言正在被击中。如果您看到了,有人可以指出逻辑错误吗?

谢谢!

#include <iostream>
#include <thread> 
#include <assert.h>

template<typename T>
class ReadWrite
{

public:

    ReadWrite() : m_numReaders(0), m_numWaiting(0), m_writing(false)
    {
        m_writeFlag.clear();
    }

    template<typename functor>
    void read(functor& readFunc)
    {
        m_numReaders++;
        std::atomic<bool>waiting(false);
        while (m_writing)
        {
            if(!waiting)
            {
                m_numWaiting++; // m_numWaiting should always be increased after m_numReaders
                waiting = true;
            }
        }

        assert(m_numWaiting <= m_numReaders);

        readFunc(&m_data);

        assert(m_numWaiting <= m_numReaders); // <-- These asserts get hit ?

        if(waiting)
        {
            m_numWaiting--; // m_numWaiting should always be decreased before m_numReaders
        }

        m_numReaders--;

        assert(m_numWaiting <= m_numReaders); // <-- These asserts get hit ?
    }

    //
    // Only a single writer can operate on this at any given time.
    //
    template<typename functor>
    void write(functor& writeFunc)
    {
        while (m_writeFlag.test_and_set());

        // Ensure no readers present
        while (m_numReaders);

        // At this point m_numReaders may have been increased !
        m_writing = true;

        // If a reader entered before the writing flag was set, wait for
        // it to finish
        while (m_numReaders > m_numWaiting);

        writeFunc(&m_data);

        m_writeFlag.clear();
        m_writing = false;
    }
private:
    T m_data;
    std::atomic<int64_t> m_numReaders;
    std::atomic<int64_t> m_numWaiting;
    std::atomic<bool> m_writing;
    std::atomic_flag m_writeFlag;
};

int main(int argc, const char * argv[])
{
    const size_t numReaders = 2;
    const size_t numWriters = 1;
    const size_t numReadWrites = 10000000;

    std::thread readThreads[numReaders];
    std::thread writeThreads[numWriters];

    ReadWrite<int> dummyData;

    auto writeFunc = [&](int* pData)    { return; }; // intentionally empty
    auto readFunc = [&](int* pData)     { return; }; // intentionally empty

    auto readThreadProc = [&]()
    {
        size_t numReads = numReadWrites;
        while (numReads--)
        {
            dummyData.read(readFunc);
        }
    };

    auto writeThreadProc = [&]()
    {
        size_t numWrites = numReadWrites;
        while (numWrites--)
        {
            dummyData.write(writeFunc);
        }
    };

    for (std::thread& thread : writeThreads)    { thread = std::thread(writeThreadProc);}
    for (std::thread& thread : readThreads)     { thread = std::thread(readThreadProc);}
    for (std::thread& thread : writeThreads)    { thread.join();}
    for (std::thread& thread : readThreads)     { thread.join();}
}
4

0 回答 0