void AFCQueue::ExtractValuesSecComplex(int startIndex, int endIndex,int helperIndex)
{
int size = 0,i,index;
TimeType min_timestamp;
bool is_singleQueue = false;
TimeType* local_queue_time = helper_queue_time[helperIndex];
int* local_queue_value = helper_queue_value[helperIndex];
volatile int& in_local_helper = in_sec[helperIndex];
volatile int& out_local_helper = out_sec[helperIndex];
NodeArrayBlock * heads_local[_MAX_THREADS];
NodeArrayBlock *tails_local[_MAX_THREADS];
int outs_local[_MAX_THREADS];
int ins_local[_MAX_THREADS];
TimeType local_timearray[_MAX_THREADS];
int min_index = 0;
min_timestamp = timestamps_arr[startIndex];
for (i=startIndex,index=startIndex ; i < endIndex; i++){
heads_local[i] = (NodeArrayBlock *)heads[i];
outs_local[i] = outs[i];
tails_local[i] = (NodeArrayBlock *)tails[i];
ins_local[i] = ins[i];
local_timearray[i] = timestamps_arr[i];
if (local_timearray[i] < min_timestamp){
min_timestamp = local_timearray[i];
index = i;
}
}
do{
//if central queue is full
while((out_local_helper-1)==in_local_helper ||
(out_local_helper==0 && in_local_helper == HELPERS_QUEUE_SIZE_1) || _gIsStopThreads){
if (_gIsStopThreads)
return;
}
local_queue_time[in_local_helper] = heads_local[index]->_timestamp_arr[outs_local[index]];
local_queue_value[in_local_helper] = heads_local[index]->_values_arr[outs_local[index]++];
if (in_local_helper < HELPERS_QUEUE_SIZE_1)
in_local_helper++;
else
in_local_helper = 0;
if (outs_local[index] == _INIT_SIZE){
heads_local[index]->_free = true;
heads_local[index] = heads_local[index]->_next;
if (heads_local[index]==null)
{
tails_local[index] = null;
ins_local[index]=0;
}
outs_local[index] = 0;
}
if (ins_local[index] == outs_local[index] &&
heads_local[index]==tails_local[index])
{
//if it was not the last local queue in the array of snapshots
if (--endIndex != index){
heads_local[index] = heads_local[endIndex];
tails_local[index] = tails_local[endIndex];
outs_local[index] = outs_local[endIndex];
ins_local[index] = ins_local[endIndex];
local_timearray[index] = local_timearray[endIndex];
}
if ((endIndex-startIndex)==1)
is_singleQueue = true;
heads_local[endIndex] = null;
}else{
local_timearray[index] = heads_local[index]->_timestamp_arr[outs_local[index]];
}
//If a single Queue left, no need to check timestamps
if (is_singleQueue){
int out = outs_local[startIndex];
int in = ins_local[startIndex];
NodeArrayBlock* he = heads_local[startIndex];
NodeArrayBlock* ta = tails_local[startIndex];
int* value_arr = he->_values_arr;
TimeType* time_arr = he->_timestamp_arr;
while (true){
if ((in == out && he==ta))
{
//heads[startIndex] = null;
return;
}
if (out == _INIT_SIZE){
he->_free = true;
he = he->_next;
if (he==null)
{
//heads[startIndex]=null;
return;
}
value_arr = he->_values_arr;
time_arr = he->_timestamp_arr;
out = 0;
}
while((out_local_helper-1)==in_local_helper ||
(out_local_helper==0 && in_local_helper == HELPERS_QUEUE_SIZE_1) ||
_gIsStopThreads){
if (_gIsStopThreads)
return;
}
if (he==ta){
if (out_local_helper <= in_local_helper){
min_index = Math::Min(HELPERS_QUEUE_SIZE-in_local_helper,in-out);
}else{
min_index = Math::Min(out_local_helper-1-in_local_helper,in-out);
}
}else{
if (out_local_helper <= in_local_helper){
min_index = Math::Min(HELPERS_QUEUE_SIZE-in_local_helper,_INIT_SIZE-out);
}else{
min_index = Math::Min(out_local_helper-1-in_local_helper,_INIT_SIZE-out);
}
}
memcpy(&local_queue_time[in_local_helper],&time_arr[out],min_index * sizeof(*time_arr));
memcpy(&local_queue_value[in_local_helper],&value_arr[out],min_index * sizeof(*value_arr));
in_local_helper+=min_index;
out+=min_index;
if (in_local_helper == HELPERS_QUEUE_SIZE)
in_local_helper = 0;
}
}
if (endIndex==startIndex)
break;
min_timestamp = local_timearray[startIndex];
for(i = startIndex+1,index=startIndex; i < endIndex ;i++){
if (local_timearray[i] < min_timestamp){
min_timestamp = local_timearray[i];
index = i;
}
}
}while(true);
}
这是我的算法的一个片段,该函数专用于迭代队列数量的单个线程(有带有时间戳的队列,并且分别有带有值的队列)
执行此方法的每个线程都会遍历 X 个队列并将它们合并到时间戳和值的单个循环队列中。
这个函数有很多缓存未命中,
如何改进以减少缓存未命中(多个线程同时使用不同的 id 执行此方法 - helperIndex)