28

我在 $work 有一个应用程序,我必须在两个以不同频率调度的实时线程之间移动。(实际的调度超出了我的控制。)应用程序是硬实时的(其中一个线程必须驱动硬件接口),所以线程之间的数据传输应该是无锁和无等待的可能的程度。

需要注意的是,只需要传输一个数据块:因为两个线程运行的速率不同,所以在较慢线程的两次唤醒之间会有两次较快线程的迭代完成的时候;在这种情况下,可以覆盖写入缓冲区中的数据,以便较慢的线程仅获取最新数据。

换句话说,代替队列,双缓冲解决方案就足够了。这两个缓冲区是在初始化期间分配的,读写线程可以调用类的方法来获取指向这些缓冲区之一的指针。

C++ 代码:

#include <mutex>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() {
        m_write_busy = false;
        m_read_idx = m_write_idx = 0;
    }

    ~ProducerConsumerDoubleBuffer() { }

    // The writer thread using this class must call
    // start_writing() at the start of its iteration
    // before doing anything else to get the pointer
    // to the current write buffer.
    T * start_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = true;
        m_write_idx = 1 - m_read_idx;

        return &m_buf[m_write_idx];
    }
    // The writer thread must call end_writing()
    // as the last thing it does
    // to release the write busy flag.
    void end_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = false;
    }

    // The reader thread must call start_reading()
    // at the start of its iteration to get the pointer
    // to the current read buffer.
    // If the write thread is not active at this time,
    // the read buffer pointer will be set to the 
    // (previous) write buffer - so the reader gets the latest data.
    // If the write buffer is busy, the read pointer is not changed.
    // In this case the read buffer may contain stale data,
    // it is up to the user to deal with this case.
    T * start_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (!m_write_busy) {
            m_read_idx = m_write_idx;
        }

        return &m_buf[m_read_idx];
    }
    // The reader thread must call end_reading()
    // at the end of its iteration.
    void end_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_read_idx = m_write_idx;
    }

private:
    T m_buf[2];
    bool m_write_busy;
    unsigned int m_read_idx, m_write_idx;
    std::mutex m_mutex;
};

为了避免读取器线程中的数据过时,有效负载结构是版本化的。为了促进线程之间的双向数据传输,使用了上述怪物的两个实例,方向相反。

问题:

  • 这个方案是线程安全的吗?如果坏了,在哪里?
  • 可以在没有互斥锁的情况下完成吗?也许只有内存屏障或 CAS 指令?
  • 可以做得更好吗?
4

3 回答 3

12

很有趣的问题!比我最初想象的要复杂得多:-) 我喜欢无锁解决方案,所以我尝试在下面解决一个问题。

有很多方法可以考虑这个系统。您可以将其建模为固定大小的循环缓冲区/队列(有两个条目),但随后您将失去更新下一个可用值以供消费的能力,因为您不知道消费者是否已开始阅读最近已发布的值或仍在(可能)阅读前一个值。因此,除了标准环形缓冲区之外,还需要额外的状态才能达到更优的解决方案。

首先请注意,生产者始终可以在任何给定时间点安全地写入一个单元格;如果消费者正在读取一个单元格,则可以写入另一个单元格。让我们称可以安全地写入“活动”单元的单元(可以潜在读取的单元是任何不是 活动的单元)。仅当当前未从其他单元读取时,才能切换活动单元。

与始终可以写入的活动单元格不同,非活动单元格只有在包含值​​时才能被读取;一旦该值被消耗,它就消失了。(这意味着在激进的生产者的情况下避免了活锁;在某些时候,消费者将清空一个单元格并将停止接触这些单元格。一旦发生这种情况,生产者肯定可以发布一个值,而在此之前,如果消费者不在读取过程中,它只能发布一个值(更改活动单元格)。)

如果有一个值可以被消费,只有消费者可以改变这个事实(对于非活动单元,无论如何);后续生产可能会更改哪个单元格处于活动状态以及发布的值,但一个值将始终准备好读取,直到它被消耗。

一旦生产者完成对活动单元格的写入,它可以通过更改哪个单元格是活动单元格(交换索引)来“发布”这个值,前提是消费者不在读取另一个单元格的过程中。如果消费者正在读取另一个单元格,则交换不会发生,但在这种情况下,消费者可以在读取完值后进行交换前提是生产者不在写入中间(如果是,生产者将在完成后交换)。事实上,一般来说,消费者在完成读取后总是可以交换(如果它是唯一一个访问系统的人),因为消费者的虚假交换是良性的:如果另一个单元格中有东西,那么交换将导致它被读取接下来,如果没有,

因此,我们需要一个共享变量来跟踪活动单元格是什么,并且我们还需要一种方法让生产者和消费者都指示他们是否处于操作的中间。我们可以将这三个状态存储到一个原子变量中,以便能够一次(原子地)影响它们。我们还需要一种方法让消费者首先检查非活动单元格中是否有任何东西,并让两个线程适当地修改该状态。我尝试了其他一些方法,但最后最简单的方法就是将此信息也包含在其他原子变量中。这使得事情更容易推理,因为系统中的所有状态变化都是原子的。

我想出了一个无等待的实现(无锁,所有操作在有限数量的指令中完成)。

代码时间!

#include <atomic>
#include <cstdint>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() : m_state(0) { }
    ~ProducerConsumerDoubleBuffer() { }

    // Never returns nullptr
    T* start_writing() {
        // Increment active users; once we do this, no one
        // can swap the active cell on us until we're done
        auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
        return &m_buf[state & 1];
    }

    void end_writing() {
        // We want to swap the active cell, but only if we were the last
        // ones concurrently accessing the data (otherwise the consumer
        // will do it for us when *it's* done accessing the data)

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
        state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
        if ((state & 0x6) == 0) {
            // The consumer wasn't in the middle of a read, we should
            // swap (unless the consumer has since started a read or
            // already swapped or read a value and is about to swap).
            // If we swap, we also want to clear the full flag on what
            // will become the active cell, otherwise the consumer could
            // eventually read two values out of order (it reads a new
            // value, then swaps and reads the old value while the
            // producer is idle).
            m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
        }
    }

    // Returns nullptr if there appears to be no more data to read yet
    T* start_reading() {
        m_readState = m_state.load(std::memory_order_relaxed);
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // Nothing to read here!
            return nullptr;
        }

        // At this point, there is guaranteed to be something to
        // read, because the full flag is never turned off by the
        // producer thread once it's on; the only thing that could
        // happen is that the active cell changes, but that can
        // only happen after the producer wrote a value into it,
        // in which case there's still a value to read, just in a
        // different cell.

        m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;

        // Now that we've incremented the user count, nobody can swap until
        // we decrement it
        return &m_buf[(m_readState & 1) ^ 1];
    }

    void end_reading() {
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // There was nothing to read; shame to repeat this
            // check, but if these functions are inlined it might
            // not matter. Otherwise the API could be changed.
            // Or just don't call this method if start_reading()
            // returns nullptr -- then you could also get rid
            // of m_readState.
            return;
        }

        // Alright, at this point the active cell cannot change on
        // us, but the active cell's flag could change and the user
        // count could change. We want to release our user count
        // and remove the flag on the value we read.

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
        state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
        if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
            // Oi, we were the last ones accessing the data when we released our cell.
            // That means we should swap, but only if the producer isn't in the middle
            // of producing something, and hasn't already swapped, and hasn't already
            // set the flag we just reset (which would mean they swapped an even number
            // of times).  Note that we don't bother swapping if there's nothing to read
            // in the other cell.
            m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
        }
    }

private:
    T m_buf[2];

    // The bottom (lowest) bit will be the active cell (the one for writing).
    // The active cell can only be switched if there's at most one concurrent
    // user. The next two bits of state will be the number of concurrent users.
    // The fourth bit indicates if there's a value available for reading
    // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
    std::atomic<std::uint32_t> m_state;

    std::uint32_t m_readState;
};

请注意,语义是这样的,消费者永远不能两次读取给定值,并且它读取的值总是比它读取的最后一个值更新。它在内存使用方面也相当有效(两个缓冲区,就像您的原始解决方案一样)。我避免使用 CAS 循环,因为它们通常比竞争下的单个原子操作效率低。

如果您决定使用上面的代码,我建议您先为其编写一些全面的(线程)单元测试。和适当的基准。我确实测试了它,但只是勉强。如果您发现任何错误,请告诉我:-)

我的单元测试:

ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_writing();
        if (item != nullptr) {      // Always true
            *item = i;
        }
        buf.end_writing();
    }
});
std::thread consumer([&]() {
    int prev = -1;
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_reading();
        if (item != nullptr) {
            assert(*item > prev);
            prev = *item;
        }
        buf.end_reading();
    }
});
producer.join();
consumer.join();

至于您最初的实现,我只是粗略地看了看(设计新东西更有趣,呵呵),但 david.pfx 的回答似乎解决了您问题的这一部分。

于 2014-05-17T16:44:53.477 回答
4

是的,我认为它坏了。

如果读取器连续执行开始/结束/开始,它将将其读取索引更新为写入索引,并可能从写入索引读取数据,即使写入繁忙也是如此。

问题本质上是写入者不知道读取器将使用哪个缓冲区,因此写入者应确保两个缓冲区始终有效。如果需要任何时间将数据写入缓冲区,它就无法做到这一点[除非我误解了此处未显示的一些逻辑。]

是的,我认为它可以在没有锁的情况下使用 CAS 或等效逻辑来完成。我不会尝试在这个空间中表达算法。我确信它存在,但不是我可以第一次正确地写出来。一些网络搜索出现了一些看似合理的候选人。使用 CAS 的无等待 IPC 似乎是一个非常有趣的话题和一些研究的主题。


经过进一步思考,算法如下。你需要:

  • 3 个缓冲区:一个供作者使用,一个供读者使用,一个额外。缓冲区是有序的:它们形成一个环(但见注释)。
  • 每个缓冲区的状态:空闲、已满、正在写入、正在读取。
  • 一个可以检查缓冲区状态并在单个原子操作中有条件地将状态更改为不同值的函数。我将为此使用 CSET。

作家:

Find the first buffer that is FREE or FULL
  Fail: assert (should never fail, reader can only use one buffer)
  CSET buffer to WRITING
Write into the buffer
CSET buffer to FULL

读者:

Find first buffer that is FULL
    Fail: wait (writer may be slow)
    CSET buffer to READING
Read and consume buffer
CSET buffer to FREE

注意:此算法不保证缓冲区按到达顺序严格处理,并且没有简单的更改可以做到这一点。如果这很重要,则应该使用缓冲区上的序列号来增强算法,由写入器设置,以便读取器可以选择最近的缓冲区。

我将代码保留为实现细节。


CSET 函数很重要。它必须以原子方式测试特定共享内存位置是否等于预期值,如果是,则将其更改为新值。如果成功进行更改,则返回 true,否则返回 false。如果两个线程同时访问相同的位置(并且可能在不同的处理器上),实现必须避免竞争条件。

C++ 标准原子操作库包含一组 atomic_compare_exchange 函数,如果可用的话,它们应该服务于该目的。

于 2014-05-16T05:28:07.020 回答
0

这是一个使用InterlockedExchangePointer()和 SLIST 的版本。

此解决方案不支持重新读取最后一个缓冲区。但如果需要,可以通过副本和if( NULL == doubleBuffer.beginReader(...) ) { use backup copy ... }.
这样做不是因为它很难添加,而是因为它不是很现实。想象一下,您的最后一个已知值变得越来越老——几秒钟、几天、几周。应用程序不太可能仍然想要使用它。因此,将重读功能分解为双缓冲代码会剥夺应用程序的灵活性。

双缓冲区有 1 个读指针成员。每当调用 beginRead() 时,都会返回此值并自动替换为 NULL。把它想象成“读者接受缓冲区”。
使用endRead(),读取器返回缓冲区并将其添加到 SLIST,其中包含用于写入操作的可用缓冲区。

最初,两个缓冲区都被添加到 SLIST,读取指针为 NULL。

beginWrite()从 SLIST 中弹出下一个可用缓冲区。而且这个值永远不能为 NULL,因为实现的方式endWrite()

最后同样重要的是,endWrite()将读取指针与返回的新写入缓冲区原子交换,如果读取指针不为 NULL,则将其推送到 SLIST。

因此,即使读取端从不读取,写入端也永远不会耗尽缓冲区。当读者阅读时,它会得到最新的已知值(一次!)。

如果有多个并发读取器或写入器,则此实现不安全。但这并不是最初的目标。

在丑陋的一面,缓冲区需要是顶部带有一些 SLIST_HEADER 成员的结构。

在这里,代码,但请记住,如果你的火星探测器降落在金星上,这不是我的错!

const size_t MAX_DATA_SIZE = 512;
typedef
//__declspec(align(MEMORY_ALLOCATION_ALIGNMENT))
struct DataItem_tag
{
    SLIST_ENTRY listNode;
    uint8_t data[MAX_DATA_SIZE];
    size_t length;
} DataItem_t;

class CDoubleBuffer
{
    SLIST_HEADER m_writePointers;
    DataItem_t m_buffers[2];
    volatile DataItem_t *m_readPointer;

public:
    CDoubleBuffer()
        : m_writePointers()
        , m_buffers()
        , m_readPointer(NULL)
    {
        InitializeSListHead(&m_writePointers);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[0].listNode);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[1].listNode);
    }
    DataItem_t *beginRead()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, NULL));
        return result;
    }
    void endRead(DataItem_t *dataItem)
    {
        if (NULL != dataItem)
        {
            InterlockedPushEntrySList(&m_writePointers, &dataItem->listNode);
        }
    }
    DataItem_t *beginWrite()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedPopEntrySList(&m_writePointers));
        return result;
    }
    void endWrite(DataItem_t *dataItem)
    {
        DataItem_t *oldReadPointer = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, dataItem));
        if (NULL != oldReadPointer)
        {
            InterlockedPushEntrySList(&m_writePointers, &oldReadPointer->listNode);
        }
    }
};

这里是它的测试代码。(对于上面的代码和测试代码,您需要 <windows.h> 和 <assert.h>。)

CDoubleBuffer doubleBuffer;

DataItem_t *readValue;
DataItem_t *writeValue;

// nothing to read yet. Make sure NULL is returned.
assert(NULL == doubleBuffer.beginRead());
doubleBuffer.endRead(NULL); // we got nothing, we return nothing.

// First write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 0;
doubleBuffer.endWrite(writeValue);

// Second write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 1;
doubleBuffer.endWrite(writeValue);

// Third write without read - works because it reuses the old buffer for the new write.
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 2;
doubleBuffer.endWrite(writeValue);

readValue = doubleBuffer.beginRead();
assert(NULL != readValue); // NULL would obviously be a terrible bug.
assert(2 == readValue->length); // We got the latest and greatest?
doubleBuffer.endRead(readValue);

readValue = doubleBuffer.beginRead();
assert(NULL == readValue); // We expect NULL here. Re-reading is not a feature of this implementation!
doubleBuffer.endRead(readValue);
于 2015-01-25T04:37:49.250 回答