2

我一直在尝试(我的尝试)一个无锁的多生产者多消费者环形缓冲区。这个想法的基础是使用 unsigned char 和 unsigned short 类型的固有溢出,将元素缓冲区固定为这些类型中的任何一种,然后你有一个自由循环回到环形缓冲区的开头。

问题是 - 我的解决方案不适用于多个生产者(尽管它适用于 N 个消费者,也适用于单个生产者单个消费者)。

#include <atomic>

template<typename Element, typename Index = unsigned char> struct RingBuffer
{
  std::atomic<Index> readIndex;
  std::atomic<Index> writeIndex;
  std::atomic<Index> scratchIndex;
  Element elements[1 << (sizeof(Index) * 8)];

  RingBuffer() :
    readIndex(0),
    writeIndex(0),
    scratchIndex(0)
  {
    ;
  }

  bool push(const Element & element)
  {
    while(true)
    {
      const Index currentReadIndex = readIndex.load();
      Index currentWriteIndex = writeIndex.load();
      const Index nextWriteIndex = currentWriteIndex + 1;
      if(nextWriteIndex == currentReadIndex)
      {
        return false;
      }

      if(scratchIndex.compare_exchange_strong(
        currentWriteIndex, nextWriteIndex))
      {
        elements[currentWriteIndex] = element;
        writeIndex = nextWriteIndex;
        return true;
      }
    }
  }

  bool pop(Element & element)
  {
    Index currentReadIndex = readIndex.load();

    while(true)
    {
      const Index currentWriteIndex = writeIndex.load();
      const Index nextReadIndex = currentReadIndex + 1;
      if(currentReadIndex == currentWriteIndex)
      {
        return false;
      }

      element = elements[currentReadIndex];

      if(readIndex.compare_exchange_strong(
        currentReadIndex, nextReadIndex))
      {
        return true;
      }
    }
  }
};

写入的主要思想是使用临时索引“scratchIndex”,它充当伪锁,以在任何时候只允许一个生产者复制构造到元素缓冲区,然后更新 writeIndex 并允许任何其他生产者取得进展. 在我因暗示我的方法是“无锁”而被称为异教徒之前,我意识到这种方法并不是完全无锁的,但在实践中(如果可行的话!)它比使用普通互斥锁要快得多!

我在这里知道一个(更复杂的)MPMC ringbuffer 解决方案http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue,但我真的在尝试我的想法然后比较反对这种方法并找出每种方法的优点(或者实际上我的方法是否完全失败了!)。

我尝试过的事情;

  • 使用 compare_exchange_weak
  • 使用与我想要的行为匹配的更精确的 std::memory_order
  • 在我拥有的各种索引之间添加缓存线垫
  • 制作元素 std::atomic 而不仅仅是元素数组

我确信这归结为我脑海中的一个基本段错误,即如何使用原子访问来绕过使用互斥体,我将完全感谢谁能指出哪些神经元在我的脑海中严重失灵!:)

4

2 回答 2

2

This is a form of the A-B-A problem. A successful producer looks something like this:

  1. load currentReadIndex
  2. load currentWriteIndex
  3. cmpxchg store scratchIndex = nextWriteIndex
  4. store element
  5. store writeIndex = nextWriteIndex

If a producer stalls for some reason between steps 2 and 3 for long enough, it is possible for the other producers to produce an entire queue's worth of data and wrap back around to the exact same index so that the compare-exchange in step 3 succeeds (because scratchIndex happens to be equal to currentWriteIndex again).

By itself, that isn't a problem. The stalled producer is perfectly within its rights to increment scratchIndex to lock the queue—even if a magical ABA-detecting cmpxchg rejected the store, the producer would simply try again, reload exactly the same currentWriteIndex, and proceed normally.

The actual problem is the nextWriteIndex == currentReadIndex check between steps 2 and 3. The queue is logically empty if currentReadIndex == currentWriteIndex, so this check exists to make sure that no producer gets so far ahead that it overwrites elements that no consumer has popped yet. It appears to be safe to do this check once at the top, because all the consumers should be "trapped" between the observed currentReadIndex and the observed currentWriteIndex.

Except that another producer can come along and bump up the writeIndex, which frees the consumer from its trap. If a producer stalls between steps 2 and 3, when it wakes up the stored value of readIndex could be absolutely anything.

Here's an example, starting with an empty queue, that shows the problem happening:

  1. Producer A runs steps 1 and 2. Both loaded indices are 0. The queue is empty.
  2. Producer B interrupts and produces an element.
  3. Consumer pops an element. Both indices are 1.
  4. Producer B produces 255 more elements. The write index wraps around to 0, the read index is still 1.
  5. Producer A awakens from its slumber. It had previously loaded both read and write indices as 0 (empty queue!), so it attempts step 3. Because the other producer coincidentally paused on index 0, the compare-exchange succeeds, and the store progresses. At completion the producer lets writeIndex = 1, and now both stored indices are 1, and the queue is logically empty. A full queue's worth of elements will now be completely ignored.

(I should mention that the only reason I can get away with talking about "stalling" and "waking up" is that all the atomics used are sequentially consistent, so I can pretend that we're in a single-threaded environment.)


Note that the way that you are using scratchIndex to guard concurrent writes is essentially a lock; whoever successfully completes the cmpxchg gets total write access to the queue until it releases the lock. The simplest way to fix this failure is to just replace scratchIndex with a spinlock—it won't suffer from A-B-A and it's what's actually happening.

于 2014-09-01T16:05:14.660 回答
1
 bool push(const Element & element)
  {
    while(true)
    {
      const Index currentReadIndex = readIndex.load();
      Index currentWriteIndex = writeIndex.load();
      const Index nextWriteIndex = currentWriteIndex + 1;
      if(nextWriteIndex == currentReadIndex)
      {
        return false;
      }

      if(scratchIndex.compare_exchange_strong(
        currentWriteIndex, nextWriteIndex))
      {
        elements[currentWriteIndex] = element;
        // Problem here!
        writeIndex = nextWriteIndex;
        return true;
      }
    }
  }

我已经标记了有问题的地方。多个线程可以同时到达 writeIndex = nextWriteIndex。数据将以任何顺序写入,尽管每次写入都是原子的。

这是一个问题,因为您尝试使用相同的原子条件更新两个值,这通常是不可能的。假设您的方法的其余部分很好,解决此问题的一种方法是将 scratchIndex 和 writeIndex 组合成一个双倍大小的值。例如,将两个 uint32_t 值视为一个 uint64_t 值并对其进行原子操作。

于 2014-09-01T12:17:33.803 回答