79

我需要一个固定大小(在创建时可以在运行时选择,而不是在编译时选择)循环缓冲区,它可以保存任何类型的对象,并且它需要非常高性能。我认为不会出现资源争用问题,因为尽管它是在多任务嵌入式环境中,但它是一个协作环境,因此任务本身可以管理它。

我最初的想法是在缓冲区中存储一个简单的结构,该结构将包含类型(简单枚举/定义)和一个指向有效负载的 void 指针,但我希望它尽可能快,所以我愿意接受涉及绕过的建议堆。

实际上,我很高兴绕过任何标准库以获得原始速度 - 从我所看到的代码来看,它并没有针对 CPU 进行大量优化:看起来他们只是为之类的东西编译了 C 代码,strcpy()没有手工编码组装。

任何代码或想法将不胜感激。所需的操作是:

  • 创建具有特定大小的缓冲区。
  • 放在尾部。
  • 从头上得到。
  • 返回计数。
  • 删除一个缓冲区。
4

9 回答 9

90

最简单的解决方案是跟踪项目大小和项目数,然后创建适当字节数的缓冲区:

typedef struct circular_buffer
{
    void *buffer;     // data buffer
    void *buffer_end; // end of data buffer
    size_t capacity;  // maximum number of items in the buffer
    size_t count;     // number of items in the buffer
    size_t sz;        // size of each item in the buffer
    void *head;       // pointer to head
    void *tail;       // pointer to tail
} circular_buffer;

void cb_init(circular_buffer *cb, size_t capacity, size_t sz)
{
    cb->buffer = malloc(capacity * sz);
    if(cb->buffer == NULL)
        // handle error
    cb->buffer_end = (char *)cb->buffer + capacity * sz;
    cb->capacity = capacity;
    cb->count = 0;
    cb->sz = sz;
    cb->head = cb->buffer;
    cb->tail = cb->buffer;
}

void cb_free(circular_buffer *cb)
{
    free(cb->buffer);
    // clear out other fields too, just to be safe
}

void cb_push_back(circular_buffer *cb, const void *item)
{
    if(cb->count == cb->capacity){
        // handle error
    }
    memcpy(cb->head, item, cb->sz);
    cb->head = (char*)cb->head + cb->sz;
    if(cb->head == cb->buffer_end)
        cb->head = cb->buffer;
    cb->count++;
}

void cb_pop_front(circular_buffer *cb, void *item)
{
    if(cb->count == 0){
        // handle error
    }
    memcpy(item, cb->tail, cb->sz);
    cb->tail = (char*)cb->tail + cb->sz;
    if(cb->tail == cb->buffer_end)
        cb->tail = cb->buffer;
    cb->count--;
}
于 2009-05-06T02:18:59.330 回答
16
// Note power of two buffer size
#define kNumPointsInMyBuffer 1024 

typedef struct _ringBuffer {
    UInt32 currentIndex;
    UInt32 sizeOfBuffer;
    double data[kNumPointsInMyBuffer];
} ringBuffer;

// Initialize the ring buffer
ringBuffer *myRingBuffer = (ringBuffer *)calloc(1, sizeof(ringBuffer));
myRingBuffer->sizeOfBuffer = kNumPointsInMyBuffer;
myRingBuffer->currentIndex = 0;

// A little function to write into the buffer
// N.B. First argument of writeIntoBuffer() just happens to have the
// same as the one calloc'ed above. It will only point to the same
// space in memory if the calloc'ed pointer is passed to
// writeIntoBuffer() as an arg when the function is called. Consider
// using another name for clarity
void writeIntoBuffer(ringBuffer *myRingBuffer, double *myData, int numsamples) {
    // -1 for our binary modulo in a moment
    int buffLen = myRingBuffer->sizeOfBuffer - 1;
    int lastWrittenSample = myRingBuffer->currentIndex;

    int idx;
    for (int i=0; i < numsamples; ++i) {
        // modulo will automagically wrap around our index
        idx = (i + lastWrittenSample) & buffLen; 
        myRingBuffer->data[idx] = myData[i];
    }

    // Update the current index of our ring buffer.
    myRingBuffer->currentIndex += numsamples;
    myRingBuffer->currentIndex &= myRingBuffer->sizeOfBuffer - 1;
}

只要你的环形缓冲区的长度是 2 的幂,令人难以置信的快速二进制“&”操作将为你环绕你的索引。对于我的应用程序,我从麦克风获取的音频环形缓冲区向用户显示一段音频。

我总是确保可以在屏幕上显示的最大音频量远小于环形缓冲区的大小。否则,您可能正在从同一个块读取和写入。这可能会给您带来奇怪的显示伪影。

于 2009-11-20T16:22:27.810 回答
11

您可以在编写缓冲区代码时枚举所需的类型,还是需要能够在运行时通过动态调用添加类型?如果是前者,那么我会将缓冲区创建为由 n 个结构组成的堆分配数组,其中每个结构由两个元素组成:标识数据类型的枚举标记和所有数据类型的联合。您在小元素的额外存储方面所失去的东西,您可以通过不必处理分配/解除分配以及由此产生的内存碎片来弥补。然后,您只需要跟踪定义缓冲区头部和尾部元素的开始和结束索引,并确保在递增/递减索引时计算 mod n。

于 2009-05-06T02:45:21.310 回答
10

首先,标题。如果您使用位整数来保存头部和尾部“指针”,并且调整它们的大小以使它们完全同步,则不需要模运算来包装缓冲区。IE:填充到 12 位无符号整数中的 4096 本身就是 0,不受任何干扰。消除模运算,即使是 2 的幂,速度也会翻倍——几乎完全一样。

在我的第三代 i7 Dell XPS 8500 上使用 Visual Studio 2010 的 C++ 编译器和默认内联,填充和排出任何类型数据元素的 4096 缓冲区的 1000 万次迭代需要 52 秒,其中 1/8192 用于服务数据。

我将 RX 重写 main() 中的测试循环,这样它们就不再控制流程——而且应该由指示缓冲区满或空的返回值以及随之而来的中断来控制;陈述。IE:填料和排水器应该能够相互碰撞而不会损坏或不稳定。在某个时候,我希望对这段代码进行多线程处理,因此这种行为将是至关重要的。

QUEUE_DESC(队列描述符)和初始化函数强制此代码中的所有缓冲区为 2 的幂。否则上述方案将不起作用。在这个主题上,请注意 QUEUE_DESC 不是硬编码的,它使用清单常量 (#define BITS_ELE_KNT) 进行构造。(我假设 2 的幂在这里足够灵活)

为了使缓冲区大小运行时可选择,我尝试了不同的方法(此处未显示),并决定使用能够管理 FIFO 缓冲区 [USHRT] 的 Head、Tail、EleKnt 的 USHRT。为了避免模运算,我用 Head, Tail 创建了一个 && 的掩码,但这个掩码原来是 (EleKnt -1),所以就使用它。使用 USHRTS 而不是位整数在安静的机器上提高了约 15% 的性能。英特尔 CPU 内核总是比它们的总线快,因此在繁忙的共享机器上,打包数据结构可以让你在其他竞争线程之前加载和执行。权衡取舍。

请注意,缓冲区的实际存储空间是使用 calloc() 在堆上分配的,并且指针位于结构的底部,因此结构和指针具有完全相同的地址。IE; 不需要将偏移量添加到结构地址以占用寄存器。

同样,与服务缓冲区相关的所有变量在物理上与缓冲区相邻,绑定到同一个结构中,因此编译器可以生成漂亮的汇编语言。您必须终止内联优化才能看到任何程序集,否则它会被遗忘。

为了支持任何数据类型的多态性,我使用了 memcpy() 而不是赋值。如果您只需要在每次编译时灵活地支持一种随机变量类型,那么这段代码就可以完美运行。

对于多态性,您只需要知道类型及其存储要求。描述符的 DATA_DESC 数组提供了一种方法来跟踪放入 QUEUE_DESC.pB​​uffer 中的每个数据,以便可以正确检索它。我只需分配足够的 pBuffer 内存来保存最大数据类型的所有元素,但要跟踪给定数据在 DATA_DESC.dBytes 中实际使用了多少存储空间。另一种方法是重新发明一个堆管理器。

这意味着 QUEUE_DESC 的 UCHAR *pBuffer 将有一个并行的伴随数组来跟踪数据类型和大小,而数据在 pBuffer 中的存储位置将保持原样。新成员可能类似于 DATA_DESC *pDataDesc,或者可能是 DATA_DESC DataDesc[2^BITS_ELE_KNT],如果您能找到一种方法,通过这样的前向引用使您的编译器提交。Calloc() 在这些情况下总是更灵活。

您仍然会在 Q_Put(),Q_Get 中使用 memcpy(),但实际复制的字节数将由 DATA_DESC.dBytes 决定,而不是由 QUEUE_DESC.EleBytes 决定。对于任何给定的 put 或 get,元素可能具有所有不同的类型/大小。

我相信这段代码可以满足速度和缓冲区大小的要求,并且可以满足 6 种不同数据类型的要求。我以 printf() 语句的形式保留了许多测试装置,因此您可以让自己(或不)确信代码正常工作。随机数生成器表明该代码适用于任何随机头/尾组合。

enter code here
// Queue_Small.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include <time.h>
#include <limits.h>
#include <stdlib.h>
#include <malloc.h>
#include <memory.h>
#include <math.h>

#define UCHAR unsigned char
#define ULONG unsigned long
#define USHRT unsigned short
#define dbl   double
/* Queue structure */
#define QUEUE_FULL_FLAG 1
#define QUEUE_EMPTY_FLAG -1
#define QUEUE_OK 0
//  
#define BITS_ELE_KNT    12  //12 bits will create 4.096 elements numbered 0-4095
//
//typedef struct    {
//  USHRT dBytes:8;     //amount of QUEUE_DESC.EleBytes storage used by datatype
//  USHRT dType :3; //supports 8 possible data types (0-7)
//  USHRT dFoo  :5; //unused bits of the unsigned short host's storage
// }    DATA_DESC;
//  This descriptor gives a home to all the housekeeping variables
typedef struct  {
    UCHAR   *pBuffer;   //  pointer to storage, 16 to 4096 elements
    ULONG Tail  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG Head  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG EleBytes  :8;     //  sizeof(elements) with range of 0-256 bytes
    // some unused bits will be left over if BITS_ELE_KNT < 12
    USHRT EleKnt    :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096)
    //USHRT Flags   :(8*sizeof(USHRT) - BITS_ELE_KNT +1);   //  flags you can use
    USHRT   IsFull  :1;     // queue is full
    USHRT   IsEmpty :1;     // queue is empty
    USHRT   Unused  :1;     // 16th bit of USHRT
}   QUEUE_DESC;

//  ---------------------------------------------------------------------------
//  Function prototypes
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz);
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew);
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q);
//  ---------------------------------------------------------------------------
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz)    {
    memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero
    //select buffer size from powers of 2 to receive modulo 
    //                arithmetic benefit of bit uints overflowing
    Q->EleKnt   =   (USHRT)pow(2.0, BitsForEleKnt);
    Q->EleBytes =   DataTypeSz; // how much storage for each element?
    //  Randomly generated head, tail a test fixture only. 
    //      Demonstrates that the queue can be entered at a random point 
    //      and still perform properly. Normally zero
    srand(unsigned(time(NULL)));    // seed random number generator with current time
    Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset
    Q->Head = Q->Tail = 0;
    //  allocate queue's storage
    if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes)))  {
        return NULL;
    }   else    {
        return Q;
    }
}
//  ---------------------------------------------------------------------------
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew)   
{
    memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes);
    if(Q->Tail == (Q->Head + Q->EleKnt)) {
        //  Q->IsFull = 1;
        Q->Tail += 1;   
        return QUEUE_FULL_FLAG; //  queue is full
    }
    Q->Tail += 1;   //  the unsigned bit int MUST wrap around, just like modulo
    return QUEUE_OK; // No errors
}
//  ---------------------------------------------------------------------------
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q)   
{
    memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes);
    Q->Head += 1;   //  the bit int MUST wrap around, just like modulo

    if(Q->Head == Q->Tail)      {
        //  Q->IsEmpty = 1;
        return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get
    }
    return QUEUE_OK; // No errors
}
//
//  ---------------------------------------------------------------------------
int _tmain(int argc, _TCHAR* argv[])    {
//  constrain buffer size to some power of 2 to force faux modulo arithmetic
    int LoopKnt = 1000000;  //  for benchmarking purposes only
    int k, i=0, Qview=0;
    time_t start;
    QUEUE_DESC Queue, *Q;
    if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) {
        printf("\nProgram failed to initialize. Aborting.\n\n");
        return 0;
    }

    start = clock();
    for(k=0; k<LoopKnt; k++)    {
        //printf("\n\n Fill'er up please...\n");
        //Q->Head = Q->Tail = rand();
        for(i=1; i<= Q->EleKnt; i++)    {
            Qview = i*i;
            if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview))    {
                //printf("\nQueue is full at %i \n", i);
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //  Get data from queue until completely drained (empty)
        //
        //printf("\n\n Step into the lab, and see what's on the slab... \n");
        Qview = 0;
        for(i=1; i; i++)    {
            if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q))   {
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                //printf("\nQueue is empty at %i", i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    }
    printf("\nQueue time was %5.3f to fill & drain %i element queue  %i times \n", 
                     (dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt);
    printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    getchar();
    return 0;
}
于 2012-12-15T00:29:17.197 回答
8

这是 C 语言中的一个简单解决方案。假设每个函数都关闭了中断。没有多态性和东西,只是常识。


#define BUFSIZE 128
char buf[BUFSIZE];
char *pIn, *pOut, *pEnd;
char full;

// init
void buf_init()
{
    pIn = pOut = buf;       // init to any slot in buffer
    pEnd = &buf[BUFSIZE];   // past last valid slot in buffer
    full = 0;               // buffer is empty
}

// add char 'c' to buffer
int buf_put(char c)
{
    if (pIn == pOut  &&  full)
        return 0;           // buffer overrun

    *pIn++ = c;             // insert c into buffer
    if (pIn >= pEnd)        // end of circular buffer?
        pIn = buf;          // wrap around

    if (pIn == pOut)        // did we run into the output ptr?
        full = 1;           // can't add any more data into buffer
    return 1;               // all OK
}

// get a char from circular buffer
int buf_get(char *pc)
{
    if (pIn == pOut  &&  !full)
        return 0;           // buffer empty  FAIL

    *pc = *pOut++;              // pick up next char to be returned
    if (pOut >= pEnd)       // end of circular buffer?
        pOut = buf;         // wrap around

    full = 0;               // there is at least 1 slot
    return 1;               // *pc has the data to be returned
}
于 2012-12-26T22:28:47.477 回答
2

一个简单的实现可能包括:

  • 一个缓冲区,实现为大小为 n 的数组,您需要的任何类型
  • 读取指针或索引(以您的处理器更有效的为准)
  • 写指针或索引
  • 指示缓冲区中有多少数据的计数器(可从读写指针派生,但单独跟踪它更快)

每次写入数据时,都会推进写入指针并递增计数器。当您读取数据时,您增加读取指针并减少计数器。如果任一指​​针达到 n,则将其设置为零。

如果 counter = n,你不能写。如果计数器 = 0,您将无法读取。

于 2009-05-06T12:14:46.030 回答
2

C 风格,简单的整数环形缓冲区。首先使用 init,而不是使用 put 和 get。如果缓冲区不包含任何数据,则返回“0”零。

//=====================================
// ring buffer address based
//=====================================
#define cRingBufCount   512
int     sRingBuf[cRingBufCount];    // Ring Buffer
int     sRingBufPut;                // Input index address
int     sRingBufGet;                // Output index address
Bool    sRingOverWrite;

void    GetRingBufCount(void)
{
int     r;
`       r= sRingBufPut - sRingBufGet;
        if ( r < cRingBufCount ) r+= cRingBufCount;
        return r; 
}

void    InitRingBuffer(void)
{
        sRingBufPut= 0;
        sRingBufGet= 0;
}       

void    PutRingBuffer(int d)
{
        sRingBuffer[sRingBufPut]= d;
        if (sRingBufPut==sRingBufGet)// both address are like ziro
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            sRingBufGet= IncRingBufferPointer(sRingBufGet);
        }
        else //Put over write a data
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            if (sRingBufPut==sRingBufGet)
            {
                sRingOverWrite= Ture;
                sRingBufGet= IncRingBufferPointer(sRingBufGet);
            }
        }
}

int     GetRingBuffer(void)
{
int     r;
        if (sRingBufGet==sRingBufPut) return 0;
        r= sRingBuf[sRingBufGet];
        sRingBufGet= IncRingBufferPointer(sRingBufGet);
        sRingOverWrite=False;
        return r;
}

int     IncRingBufferPointer(int a)
{
        a+= 1;
        if (a>= cRingBufCount) a= 0;
        return a;
}
于 2015-10-12T22:13:05.250 回答
1

@Adam Rosenfield 的解决方案虽然是正确的,但可以使用更轻量级的circular_buffer结构来实现,该结构不涉及countcapacity.

该结构只能包含以下 4 个指针:

  • buffer: 指向内存中缓冲区的开始。
  • buffer_end: 指向内存中缓冲区的末尾。
  • head:指向存储数据的结尾。
  • tail:指向存储数据的开始。

我们可以保留该sz属性以允许对存储单元进行参数化。

count和值都capacity应该可以使用上述指针进行派生。

容量

capacity是直截了当的,因为它可以通过将buffer_end指针和buffer指针之间的距离除以存储单元来得出sz(下面的代码段是伪代码):

capacity = (buffer_end - buffer) / sz

数数

不过,对于计数,事情变得有点复杂。head例如,在和tail指向同一个位置的场景下,无法判断缓冲区是空还是满。

为了解决这个问题,缓冲区应该为额外的元素分配内存。例如,如果我们的循环缓冲区所需的容量是10 * sz,那么我们需要分配11 * sz

容量公式将变为(下面的代码段是伪代码):

capacity_bytes = buffer_end - buffer - sz
capacity = capacity_bytes / sz

这种额外的元素语义允许我们构建评估缓冲区是空还是满的条件。

空状态条件

为了使缓冲区为空,head指针指向与指针相同的位置tail

head == tail

如果上述计算结果为真,则缓冲区为空。

全状态条件

为了使缓冲区满,head指针应该是tail指针后面的 1 个元素。head因此,从该位置跳转到该位置所需覆盖的空间tail应等于1 * sz

如果tail大于head

tail - head == sz

如果上述计算结果为真,则缓冲区已满。

如果head大于tail

  1. buffer_end - head返回从head缓冲区跳转到末尾的空间。
  2. tail - buffer返回从缓冲区开头跳转到尾部所需的空间。
  3. 加上上面 2 应该等于从 the 跳转head到 thetail
  4. 步骤 3 中导出的空间,不大于1 * sz
(buffer_end - head) + (tail - buffer) == sz
=> buffer_end - buffer - head + tail == sz
=> buffer_end - buffer - sz == head - tail
=> head - tail == buffer_end - buffer - sz
=> head - tail == capacity_bytes

如果上述计算结果为真,则缓冲区已满。

在实践中

修改@Adam Rosenfield 以使用上述circular_buffer结构:

#include <string.h>

#define CB_SUCCESS 0        /* CB operation was successful */
#define CB_MEMORY_ERROR 1   /* Failed to allocate memory */
#define CB_OVERFLOW_ERROR 2 /* CB is full. Cannot push more items. */
#define CB_EMPTY_ERROR 3    /* CB is empty. Cannot pop more items. */

typedef struct circular_buffer {
  void *buffer;
  void *buffer_end;
  size_t sz;
  void *head;
  void *tail;
} circular_buffer;

int cb_init(circular_buffer *cb, size_t capacity, size_t sz) {
  const int incremented_capacity = capacity + 1; // Add extra element to evaluate count
  cb->buffer = malloc(incremented_capacity * sz);
  if (cb->buffer == NULL)
    return CB_MEMORY_ERROR;
  cb->buffer_end = (char *)cb->buffer + incremented_capacity * sz;
  cb->sz = sz;
  cb->head = cb->buffer;
  cb->tail = cb->buffer;
  return CB_SUCCESS;
}

int cb_free(circular_buffer *cb) {
  free(cb->buffer);
  return CB_SUCCESS;
}

const int _cb_length(circular_buffer *cb) {
  return (char *)cb->buffer_end - (char *)cb->buffer;
}

int cb_push_back(circular_buffer *cb, const void *item) {
  const int buffer_length = _cb_length(cb);
  const int capacity_length = buffer_length - cb->sz;

  if ((char *)cb->tail - (char *)cb->head == cb->sz ||
      (char *)cb->head - (char *)cb->tail == capacity_length)
    return CB_OVERFLOW_ERROR;

  memcpy(cb->head, item, cb->sz);

  cb->head = (char*)cb->head + cb->sz;
  if(cb->head == cb->buffer_end)
    cb->head = cb->buffer;

  return CB_SUCCESS;
}

int cb_pop_front(circular_buffer *cb, void *item) {
  if (cb->head == cb->tail)
    return CB_EMPTY_ERROR;

  memcpy(item, cb->tail, cb->sz);

  cb->tail = (char*)cb->tail + cb->sz;
  if(cb->tail == cb->buffer_end)
    cb->tail = cb->buffer;

  return CB_SUCCESS;
}

于 2020-12-23T14:44:36.100 回答
0

扩展 adam-rosenfield 的解决方案,我认为以下内容适用于多线程单生产者 - 单消费者场景。

int cb_push_back(circular_buffer *cb, const void *item)
{
  void *new_head = (char *)cb->head + cb->sz;
  if (new_head == cb>buffer_end) {
      new_head = cb->buffer;
  }
  if (new_head == cb->tail) {
    return 1;
  }
  memcpy(cb->head, item, cb->sz);
  cb->head = new_head;
  return 0;
}

int cb_pop_front(circular_buffer *cb, void *item)
{
  void *new_tail = cb->tail + cb->sz;
  if (cb->head == cb->tail) {
    return 1;
  }
  memcpy(item, cb->tail, cb->sz);
  if (new_tail == cb->buffer_end) {
    new_tail = cb->buffer;
  }
  cb->tail = new_tail;
  return 0;
}
于 2020-08-02T19:03:33.560 回答