0
void AFCQueue::ExtractValuesSecComplex(int startIndex, int endIndex,int helperIndex)
{

int size = 0,i,index;
TimeType min_timestamp;
bool is_singleQueue = false;
TimeType* local_queue_time = helper_queue_time[helperIndex];
int* local_queue_value = helper_queue_value[helperIndex];
volatile int& in_local_helper = in_sec[helperIndex];
volatile int& out_local_helper = out_sec[helperIndex];

NodeArrayBlock * heads_local[_MAX_THREADS];
NodeArrayBlock *tails_local[_MAX_THREADS];
int outs_local[_MAX_THREADS];
int ins_local[_MAX_THREADS];
TimeType local_timearray[_MAX_THREADS];
int min_index = 0;

min_timestamp = timestamps_arr[startIndex];
for (i=startIndex,index=startIndex ; i < endIndex; i++){
    heads_local[i] = (NodeArrayBlock *)heads[i];
    outs_local[i]  = outs[i];
    tails_local[i] = (NodeArrayBlock *)tails[i];
    ins_local[i]   = ins[i];
    local_timearray[i] = timestamps_arr[i];

    if (local_timearray[i] < min_timestamp){
        min_timestamp = local_timearray[i];
        index = i;
    }
}

do{
    //if central queue is full 
    while((out_local_helper-1)==in_local_helper || 
        (out_local_helper==0 && in_local_helper == HELPERS_QUEUE_SIZE_1) || _gIsStopThreads){
        if (_gIsStopThreads)
            return;
    }

    local_queue_time[in_local_helper] = heads_local[index]->_timestamp_arr[outs_local[index]];
    local_queue_value[in_local_helper] = heads_local[index]->_values_arr[outs_local[index]++];

    if (in_local_helper < HELPERS_QUEUE_SIZE_1)
        in_local_helper++;
    else
        in_local_helper = 0;    

    if (outs_local[index] == _INIT_SIZE){
        heads_local[index]->_free = true;
        heads_local[index] = heads_local[index]->_next;
        if (heads_local[index]==null)
        {
            tails_local[index] = null;
            ins_local[index]=0;
        }
        outs_local[index] = 0;
    }
    if (ins_local[index] == outs_local[index] &&
        heads_local[index]==tails_local[index])
    {
        //if it was not the last local queue in the array of snapshots
        if (--endIndex != index){
            heads_local[index]                  = heads_local[endIndex];
            tails_local[index]                  = tails_local[endIndex];
            outs_local[index]                   = outs_local[endIndex];
            ins_local[index]                    = ins_local[endIndex];
            local_timearray[index]              = local_timearray[endIndex];
        }
        if ((endIndex-startIndex)==1)
            is_singleQueue = true;
        heads_local[endIndex]                   = null;
    }else{
        local_timearray[index] = heads_local[index]->_timestamp_arr[outs_local[index]];
    }
    //If a single Queue left, no need to check timestamps
    if (is_singleQueue){
        int out = outs_local[startIndex];
        int in  = ins_local[startIndex];
        NodeArrayBlock* he = heads_local[startIndex];
        NodeArrayBlock* ta = tails_local[startIndex];
        int* value_arr = he->_values_arr;
        TimeType* time_arr = he->_timestamp_arr;
        while (true){
            if ((in == out && he==ta))
            {
                //heads[startIndex] = null;
                return;
            }
            if (out == _INIT_SIZE){
                he->_free = true;
                he = he->_next;
                if (he==null)
                {
                    //heads[startIndex]=null;
                    return;
                }
                value_arr = he->_values_arr;
                time_arr = he->_timestamp_arr;
                out = 0;
            }   
            while((out_local_helper-1)==in_local_helper || 
                (out_local_helper==0 && in_local_helper == HELPERS_QUEUE_SIZE_1) || 
                _gIsStopThreads){
                if (_gIsStopThreads)
                    return;
            }

            if (he==ta){
                if (out_local_helper <= in_local_helper){
                    min_index = Math::Min(HELPERS_QUEUE_SIZE-in_local_helper,in-out);
                }else{
                    min_index = Math::Min(out_local_helper-1-in_local_helper,in-out);
                }
            }else{
                if (out_local_helper <= in_local_helper){
                    min_index = Math::Min(HELPERS_QUEUE_SIZE-in_local_helper,_INIT_SIZE-out);
                }else{
                    min_index = Math::Min(out_local_helper-1-in_local_helper,_INIT_SIZE-out);
                }
            }
            memcpy(&local_queue_time[in_local_helper],&time_arr[out],min_index * sizeof(*time_arr));
            memcpy(&local_queue_value[in_local_helper],&value_arr[out],min_index * sizeof(*value_arr));
            in_local_helper+=min_index;
            out+=min_index;
            if (in_local_helper == HELPERS_QUEUE_SIZE)
                in_local_helper = 0;
        }
    }
    if (endIndex==startIndex)
        break;

    min_timestamp = local_timearray[startIndex];
    for(i = startIndex+1,index=startIndex; i < endIndex ;i++){
        if (local_timearray[i] < min_timestamp){
            min_timestamp = local_timearray[i];
            index = i;
        }
    }
}while(true);
}

这是我的算法的一个片段,该函数专用于迭代队列数量的单个线程(有带有时间戳的队列,并且分别有带有值的队列)

执行此方法的每个线程都会遍历 X 个队列并将它们合并到时间戳和值的单个循环队列中。

这个函数有很多缓存未命中,

如何改进以减少缓存未命中(多个线程同时使用不同的 id 执行此方法 - helperIndex)

4

2 回答 2

1

当然,您有很多缓存未命中。您必须记住的是,CPU 与 L3 缓存和 RAM 之间只有一条总线。因此,如果线程一忙于读取内存,然后被挂起以允许另一个线程执行相同的操作但使用不同的内存,则需要重新加载缓存。当进程暂停时也会发生这种情况 - 当进程恢复执行时,缓存需要重新加载。

要限制缓存未命中,请将线程数限制为物理内核数(忽略超线程内核)。如果您的线程数多于内核数,则每次有线程切换时都需要更新缓存。如果您拥有相同数量的线程和内核,则可以减少将缓存丢失给另一个线程/进程的机会。您想尝试 num_cores - 1 个线程,看看是否有帮助。

此外,您的代码非常大且未注释。很难看出你到底在做什么。

于 2012-06-27T08:46:04.040 回答
1

这不是真正的代码问题,因此发布代码无效。

要限制缓存未命中,请更改 DATA 以便每个线程一次只需要处理自己的 [L1 缓存大小] 块。这对于合并排序来说相当容易。

一个典型的、有效的合并排序将使用一个线程池和合并任务,这些任务拆分它们的输入分区并产生子合并,直到任务得到一个小于 [L1 缓存大小] 的分区,然后使用就地排序,像快速排序一样,完成最后的位。

拆分可以通过一个额外的 [数据大小] 缓冲区来完成,任务在完成快速排序后在执行插入排序时在该缓冲区之间移动数据。应该不需要任何内存复制。

仅仅采用单线程内联代码并在不考虑数据的情况下使其在多个线程上工作是无效的。

于 2012-06-27T09:03:40.653 回答