首先,标题。如果您使用位整数来保存头部和尾部“指针”,并且调整它们的大小以使它们完全同步,则不需要模运算来包装缓冲区。IE:填充到 12 位无符号整数中的 4096 本身就是 0,不受任何干扰。消除模运算,即使是 2 的幂,速度也会翻倍——几乎完全一样。
在我的第三代 i7 Dell XPS 8500 上使用 Visual Studio 2010 的 C++ 编译器和默认内联,填充和排出任何类型数据元素的 4096 缓冲区的 1000 万次迭代需要 52 秒,其中 1/8192 用于服务数据。
我将 RX 重写 main() 中的测试循环,这样它们就不再控制流程——而且应该由指示缓冲区满或空的返回值以及随之而来的中断来控制;陈述。IE:填料和排水器应该能够相互碰撞而不会损坏或不稳定。在某个时候,我希望对这段代码进行多线程处理,因此这种行为将是至关重要的。
QUEUE_DESC(队列描述符)和初始化函数强制此代码中的所有缓冲区为 2 的幂。否则上述方案将不起作用。在这个主题上,请注意 QUEUE_DESC 不是硬编码的,它使用清单常量 (#define BITS_ELE_KNT) 进行构造。(我假设 2 的幂在这里足够灵活)
为了使缓冲区大小运行时可选择,我尝试了不同的方法(此处未显示),并决定使用能够管理 FIFO 缓冲区 [USHRT] 的 Head、Tail、EleKnt 的 USHRT。为了避免模运算,我用 Head, Tail 创建了一个 && 的掩码,但这个掩码原来是 (EleKnt -1),所以就使用它。使用 USHRTS 而不是位整数在安静的机器上提高了约 15% 的性能。英特尔 CPU 内核总是比它们的总线快,因此在繁忙的共享机器上,打包数据结构可以让你在其他竞争线程之前加载和执行。权衡取舍。
请注意,缓冲区的实际存储空间是使用 calloc() 在堆上分配的,并且指针位于结构的底部,因此结构和指针具有完全相同的地址。IE; 不需要将偏移量添加到结构地址以占用寄存器。
同样,与服务缓冲区相关的所有变量在物理上与缓冲区相邻,绑定到同一个结构中,因此编译器可以生成漂亮的汇编语言。您必须终止内联优化才能看到任何程序集,否则它会被遗忘。
为了支持任何数据类型的多态性,我使用了 memcpy() 而不是赋值。如果您只需要在每次编译时灵活地支持一种随机变量类型,那么这段代码就可以完美运行。
对于多态性,您只需要知道类型及其存储要求。描述符的 DATA_DESC 数组提供了一种方法来跟踪放入 QUEUE_DESC.pBuffer 中的每个数据,以便可以正确检索它。我只需分配足够的 pBuffer 内存来保存最大数据类型的所有元素,但要跟踪给定数据在 DATA_DESC.dBytes 中实际使用了多少存储空间。另一种方法是重新发明一个堆管理器。
这意味着 QUEUE_DESC 的 UCHAR *pBuffer 将有一个并行的伴随数组来跟踪数据类型和大小,而数据在 pBuffer 中的存储位置将保持原样。新成员可能类似于 DATA_DESC *pDataDesc,或者可能是 DATA_DESC DataDesc[2^BITS_ELE_KNT],如果您能找到一种方法,通过这样的前向引用使您的编译器提交。Calloc() 在这些情况下总是更灵活。
您仍然会在 Q_Put(),Q_Get 中使用 memcpy(),但实际复制的字节数将由 DATA_DESC.dBytes 决定,而不是由 QUEUE_DESC.EleBytes 决定。对于任何给定的 put 或 get,元素可能具有所有不同的类型/大小。
我相信这段代码可以满足速度和缓冲区大小的要求,并且可以满足 6 种不同数据类型的要求。我以 printf() 语句的形式保留了许多测试装置,因此您可以让自己(或不)确信代码正常工作。随机数生成器表明该代码适用于任何随机头/尾组合。
enter code here
// Queue_Small.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include <time.h>
#include <limits.h>
#include <stdlib.h>
#include <malloc.h>
#include <memory.h>
#include <math.h>
#define UCHAR unsigned char
#define ULONG unsigned long
#define USHRT unsigned short
#define dbl double
/* Queue structure */
#define QUEUE_FULL_FLAG 1
#define QUEUE_EMPTY_FLAG -1
#define QUEUE_OK 0
//
#define BITS_ELE_KNT 12 //12 bits will create 4.096 elements numbered 0-4095
//
//typedef struct {
// USHRT dBytes:8; //amount of QUEUE_DESC.EleBytes storage used by datatype
// USHRT dType :3; //supports 8 possible data types (0-7)
// USHRT dFoo :5; //unused bits of the unsigned short host's storage
// } DATA_DESC;
// This descriptor gives a home to all the housekeeping variables
typedef struct {
UCHAR *pBuffer; // pointer to storage, 16 to 4096 elements
ULONG Tail :BITS_ELE_KNT; // # elements, with range of 0-4095
ULONG Head :BITS_ELE_KNT; // # elements, with range of 0-4095
ULONG EleBytes :8; // sizeof(elements) with range of 0-256 bytes
// some unused bits will be left over if BITS_ELE_KNT < 12
USHRT EleKnt :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096)
//USHRT Flags :(8*sizeof(USHRT) - BITS_ELE_KNT +1); // flags you can use
USHRT IsFull :1; // queue is full
USHRT IsEmpty :1; // queue is empty
USHRT Unused :1; // 16th bit of USHRT
} QUEUE_DESC;
// ---------------------------------------------------------------------------
// Function prototypes
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz);
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew);
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q);
// ---------------------------------------------------------------------------
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz) {
memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero
//select buffer size from powers of 2 to receive modulo
// arithmetic benefit of bit uints overflowing
Q->EleKnt = (USHRT)pow(2.0, BitsForEleKnt);
Q->EleBytes = DataTypeSz; // how much storage for each element?
// Randomly generated head, tail a test fixture only.
// Demonstrates that the queue can be entered at a random point
// and still perform properly. Normally zero
srand(unsigned(time(NULL))); // seed random number generator with current time
Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset
Q->Head = Q->Tail = 0;
// allocate queue's storage
if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes))) {
return NULL;
} else {
return Q;
}
}
// ---------------------------------------------------------------------------
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew)
{
memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes);
if(Q->Tail == (Q->Head + Q->EleKnt)) {
// Q->IsFull = 1;
Q->Tail += 1;
return QUEUE_FULL_FLAG; // queue is full
}
Q->Tail += 1; // the unsigned bit int MUST wrap around, just like modulo
return QUEUE_OK; // No errors
}
// ---------------------------------------------------------------------------
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q)
{
memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes);
Q->Head += 1; // the bit int MUST wrap around, just like modulo
if(Q->Head == Q->Tail) {
// Q->IsEmpty = 1;
return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get
}
return QUEUE_OK; // No errors
}
//
// ---------------------------------------------------------------------------
int _tmain(int argc, _TCHAR* argv[]) {
// constrain buffer size to some power of 2 to force faux modulo arithmetic
int LoopKnt = 1000000; // for benchmarking purposes only
int k, i=0, Qview=0;
time_t start;
QUEUE_DESC Queue, *Q;
if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) {
printf("\nProgram failed to initialize. Aborting.\n\n");
return 0;
}
start = clock();
for(k=0; k<LoopKnt; k++) {
//printf("\n\n Fill'er up please...\n");
//Q->Head = Q->Tail = rand();
for(i=1; i<= Q->EleKnt; i++) {
Qview = i*i;
if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview)) {
//printf("\nQueue is full at %i \n", i);
//printf("\nQueue value of %i should be %i squared", Qview, i);
break;
}
//printf("\nQueue value of %i should be %i squared", Qview, i);
}
// Get data from queue until completely drained (empty)
//
//printf("\n\n Step into the lab, and see what's on the slab... \n");
Qview = 0;
for(i=1; i; i++) {
if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q)) {
//printf("\nQueue value of %i should be %i squared", Qview, i);
//printf("\nQueue is empty at %i", i);
break;
}
//printf("\nQueue value of %i should be %i squared", Qview, i);
}
//printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
}
printf("\nQueue time was %5.3f to fill & drain %i element queue %i times \n",
(dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt);
printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
getchar();
return 0;
}