所以我正在使用 Cilk 在 C 中实现并行快速排序,我遇到了一个奇怪的问题。我的代码的相关部分,供参考(并提前道歉):
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <math.h>
#include <cilk/cilk.h>
#include <cilk/cilk_api.h>
void prefixSum(int* input,int* output, int length){
if(length == 1){
output[0] = input[0];
return;
}
int i;
int nPairs = (int) floor(((double)length)/2);
int pairs = (int) ceil(((double)length)/2);
int z[pairs];
int w[pairs];
cilk_for(i=0;i<nPairs;i++){
z[i] = input[2*i]+input[2*i+1];
}
if(pairs>nPairs){
z[pairs-1] = input[length-1];
}
prefixSum(z,w,pairs);
cilk_for(i=0;i<length;i++){
if(i==0){
output[i] = input[i];
}
else if((i-1)%2==0){
output[i] = w[(i-1)/2];
}
else{
output[i] = w[(i-2)/2] + input[i];
}
}
return;
}
void prefixScan(int* input, int length){
int i;
for(i=length-1;i>0;i--){
input[i] = input[i-1];
}
input[0] = 0;
}
void paraSort(double* array, int length){
if(length==1){
return;
}
int pivot = rand() % length;
int lowSet[length];
int highSet[length];
int equalSet[length];
int i;
cilk_for(i=0;i<length;i++){
if(array[i]==array[pivot]){
lowSet[i] = 0;
highSet[i] = 0;
equalSet[i] = 1;
} else if(array[i]<array[pivot]){
lowSet[i] = 1;
highSet[i] = 0;
equalSet[i] = 0;
} else {
lowSet[i] = 0;
highSet[i] = 1;
equalSet[i] = 0;
}
}
int lowIndex[length];
int highIndex[length];
int equalIndex[length];
prefixSum(lowSet,lowIndex,length);
int numLow = lowIndex[length-1];
prefixScan(lowIndex,length);
prefixSum(highSet,highIndex,length);
int numHigh = highIndex[length-1];
prefixScan(highIndex,length);
prefixSum(equalSet,equalIndex,length);
int numEqual = equalIndex[length-1];
prefixScan(equalIndex,length);
double lowList[imin(numLow,1)];
double highList[imin(numHigh,1)];
double equalList[numEqual];
cilk_for(i=0;i<length;i++){
if(lowSet[i]==1){
lowList[lowIndex[i]] = array[i];
} else if(highSet[i]==1){
highList[highIndex[i]] = array[i];
} else if(equalSet[i]==1){
equalList[equalIndex[i]] = array[i];
}
}
if(numLow>0 && numHigh>0){
cilk_spawn paraSort(lowList,numLow);
paraSort(highList,numHigh);
cilk_sync;
} else if(numLow==0 && numHigh>0){
paraSort(highList,numHigh);
} else if(numLow>0 && numHigh==0){
paraSort(lowList,numLow);
}
cilk_for(i=0;i<length;i++){
if(i<numLow){
array[i] = lowList[i];
} else if(i<numLow+numEqual){
array[i] = equalList[i-numLow];
} else {
array[i] = highList[i-(numLow+numEqual)];
}
}
return;
}
现在,当我在一个包含 50 个元素的测试用例上运行它时(为了便于调试,依次进行),我在递归中深入一层,然后遇到一个分段错误,这似乎是由行equalList[equalIndex[i]] = array[i];
。
进一步检查,在分配 equalIndex 之后,其中的值是完全任意的。这是可以预料的;我还没有分配任何东西。prefixSum 在一个元素列表上调用,除了倒数第二个元素,它是 1 之外的元素都为零。(这是一个标记元素等于枢轴的位图。)它将前缀求和运算的结果放入 equalIndex 中,我将其作为指向数组的指针传入,以便结果在调用之外持续存在。
完成此操作后,调试 printf 命令显示 equalIndex 现在全为零,除了最后两个元素,它们都是一。这是预期的前缀和结果;到目前为止,一切都很好。prefixScan 是一个简单的辅助函数,可以帮助我从零开始处理索引;它将给定数组中的所有元素向右移动一个空格,用零填充第一个元素。将 equalIndex 传递给 this 后,调试语句显示 equalIndex 除了最后一个元素为 1 之外全为零。
问题出现的地方紧随其后,在 cilk_for 循环中,将每个元素复制到其正确的数组中。在这个循环的主体中,printf 语句现在显示的值与之前的值完全不匹配——有些是正确的零,有些是我之前看到的那种非常大的正整数或负整数,在我用 prefixSum 初始化这个数组之前. 一旦它达到这些极值之一并尝试将其用作数组索引,程序就会崩溃。
我最好的猜测是,不知何故,equalIndex 中的值没有被正确分配(因此出现了奇怪的行为,就好像我没有初始化数组一样),但我很难弄清楚到底哪里出了问题。