9

我正在尝试找到一种方法来制作无锁或非阻塞方式来为单个消费者/单个消费者创建一个环形缓冲区,这将覆盖缓冲区中最旧的数据。如果缓冲区已满,我已经阅读了很多无锁算法,当您“返回 false”时它们会起作用——即,不要添加;但是当您需要覆盖最旧的数据时,我什至找不到讨​​论如何执行此操作的伪代码。

我正在使用 GCC 4.1.2(工作限制,我无法升级版本......)并且我有 Boost 库,过去我制作了自己的 Atomic< T > 变量类型,它非常接近于upcomming 规范(它并不完美,但它是线程安全的并且可以满足我的需求)。

当我想到它时,我认为使用这些原子应该真正解决这个问题。关于我在想什么的一些粗略的伪代码:

template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;

unsigned int getNextIndex(unsigned int i)
{
 return (i + 1 ) % size;
}

public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
 if(writeIndex == getNextIndex(readIndex)) //1
 {
  readIndex = getNextIndex(readIndex); //2
  }
  buf[writeIndex] = t;
  writeIndex = getNextIndex(writeIndex);  //3
}

bool consume(T& t)
{
  if(readIndex == writeIndex)  //4
   return false;
  t = buf[readIndex];  
  readIndex = getNexIndex(readIndex);  //5
  return true;
}

};

据我所知,这里没有死锁情况,所以我们可以避免这种情况(如果我上面的实现即使在伪代码级别上也是错误的,那么建设性的批评总是值得赞赏的)。但是,我能找到的 BIG 比赛条件是:

让我们假设缓冲区已满。即writeIndex +1 = readIndex;(1) 发生,就像调用 consumer 一样。and is true (4) is false, 所以我们移动到从缓冲区读取 (5) 发生了,并且 readIndex 提前了一个(所以实际上缓冲区中的空间 (2) 发生了,提前 readIndex AGAIN,因此失去价值。

基本上,这是作家必须修改读者的经典问题,导致竞争条件。如果我每次访问它时都没有真正阻止整个列表,我想不出一种方法来防止这种情况发生。我错过了什么??

4

3 回答 3

7
  1. 从具有适当进度保证的单个生产者/多个消费者队列开始。
  2. 如果队列已满且推送失败,则弹出一个值。然后会有空间推动新的价值。
于 2010-12-10T05:18:49.700 回答
1

我错过了什么??

很多:

  • 假设你在被生产者覆盖时消费——你是如何检测/处理的?
    • 许多选项 - 例如do {将值复制出来;使用修改序列号等检查副本是否具有完整性} while (损坏)
  • 使用原子序数是不够的——你还需要使用 CAS 样式的循环来影响索引增量(尽管我假设你知道这一点,因为你说你已经广泛阅读了这个)
  • 记忆障碍

但是,让我们将其记为低于您的伪代码级别,并考虑您的明确问题:

  • 第 (5) 点实际上需要 CAS 操作。如果 readIndexconsume()在(可能损坏的)t被复制之前被正确地采样/复制到顶部,那么如果它已经被生产者递增,则 CAS 指令将失败。而不是通常的重新采样和重试 CAS,只需继续。
于 2010-12-10T05:12:01.880 回答
0

这是我最近创建的原子变量的循环缓冲区代码。我已将其修改为“覆盖”数据而不是返回 false。免责声明 - 它尚未经过生产级测试。

    template<int capacity, int gap, typename T> class nonblockigcircular {
  /*
   * capacity - size of cicular buffer
   * gap - minimum safety distance between head and tail to start insertion operation
   *       generally gap should exceed number of threads attempting insertion concurrently 
   *       capacity should be gap plus desired buffer size 
   * T   - is a data type for buffer to keep
   */
  volatile T buf[capacity];  // buffer

  std::atomic<int> t, h, ph, wh; 
  /* t to h data available for reading
   * h to ph - zone where data is likely written but h is not updated yet
   *   to make sure data is written check if ph==wh 
   * ph to wh - zone where data changes in progress 
   */

  bool pop(T &pwk) {
    int td, tnd;

    do {
      int hd=h.load()%capacity;
      td=t.load()%capacity;
      if(hd==td) return false;
      tnd=(td+1)%capacity;
    } while(!t.compare_exchange_weak(td, tnd));

    pwk=buf[td];
    return true;
  }


  const int  count() {
    return ( h.load()+capacity-t.load() ) % capacity;
    }

  bool push(const T &pwk) {
    const int tt=t.load();
    int hd=h.load();

    if(  capacity - (hd+capacity-tt) % capacity < gap) {
       // Buffer is too full to insert
       // return false; 
       // or delete last record as below
       int nt=t.fetch_add(1);
       if(nt==capacity-1) t.fetch_sub(capacity);
       }


    int nwh=wh.fetch_add(1);
    if(nwh==capacity-1) wh.fetch_sub(capacity);

    buf[nwh%capacity]=pwk;

    int nph=ph.fetch_add(1);
    if(nph==capacity-1) ph.fetch_sub(capacity);

    if(nwh==nph) {
      int ohd=hd;
      while(! h.compare_exchange_weak(hd, nwh) ) {
        hd=h.load();
        if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
      }
    }
    return true;
  }

};
于 2016-06-17T14:31:25.037 回答