正如正确指出的那样,您的代码中有几个错误会阻止其正确执行,所以我首先建议检查这些错误。
无论如何,仅考虑 OpenMP 性能如何随线程扩展,也许基于任务指令的实现会更好,因为它克服了先前答案已经指出的限制:
由于sections指令只有两个部分,我认为你不会从在并行子句中产生多于两个的线程中获得任何好处
您可以在下面找到此类实现的跟踪:
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <sys/time.h>
void getTime(double *t) {
struct timeval tv;
gettimeofday(&tv, 0);
*t = tv.tv_sec + (tv.tv_usec * 1e-6);
}
int compare( const void * pa, const void * pb ) {
const int a = *((const int*) pa);
const int b = *((const int*) pb);
return (a-b);
}
void merge(int * array, int * workspace, int low, int mid, int high) {
int i = low;
int j = mid + 1;
int l = low;
while( (l <= mid) && (j <= high) ) {
if( array[l] <= array[j] ) {
workspace[i] = array[l];
l++;
} else {
workspace[i] = array[j];
j++;
}
i++;
}
if (l > mid) {
for(int k=j; k <= high; k++) {
workspace[i]=array[k];
i++;
}
} else {
for(int k=l; k <= mid; k++) {
workspace[i]=array[k];
i++;
}
}
for(int k=low; k <= high; k++) {
array[k] = workspace[k];
}
}
void mergesort_impl(int array[],int workspace[],int low,int high) {
const int threshold = 1000000;
if( high - low > threshold ) {
int mid = (low+high)/2;
/* Recursively sort on halves */
#ifdef _OPENMP
#pragma omp task
#endif
mergesort_impl(array,workspace,low,mid);
#ifdef _OPENMP
#pragma omp task
#endif
mergesort_impl(array,workspace,mid+1,high);
#ifdef _OPENMP
#pragma omp taskwait
#endif
/* Merge the two sorted halves */
#ifdef _OPENMP
#pragma omp task
#endif
merge(array,workspace,low,mid,high);
#ifdef _OPENMP
#pragma omp taskwait
#endif
} else if (high - low > 0) {
/* Coarsen the base case */
qsort(&array[low],high-low+1,sizeof(int),compare);
}
}
void mergesort(int array[],int workspace[],int low,int high) {
#ifdef _OPENMP
#pragma omp parallel
#endif
{
#ifdef _OPENMP
#pragma omp single nowait
#endif
mergesort_impl(array,workspace,low,high);
}
}
const size_t largest = 100000000;
const size_t length = 10000000;
int main(int argc, char *argv[]) {
int * array = NULL;
int * workspace = NULL;
double start,end;
printf("Largest random number generated: %d \n",RAND_MAX);
printf("Largest random number after truncation: %d \n",largest);
printf("Array size: %d \n",length);
/* Allocate and initialize random vector */
array = (int*) malloc(length*sizeof(int));
workspace = (int*) malloc(length*sizeof(int));
for( int ii = 0; ii < length; ii++)
array[ii] = rand()%largest;
/* Sort */
getTime(&start);
mergesort(array,workspace,0,length-1);
getTime(&end);
printf("Elapsed time sorting: %g sec.\n", end-start);
/* Check result */
for( int ii = 1; ii < length; ii++) {
if( array[ii] < array[ii-1] ) printf("Error:\n%d %d\n%d %d\n",ii-1,array[ii-1],ii,array[ii]);
}
free(array);
free(workspace);
return 0;
}
请注意,如果您寻求性能,您还必须保证递归的基本情况足够粗略,以避免由于递归函数调用而产生的大量开销。除此之外,我建议对您的代码进行概要分析,以便您可以很好地提示哪些部分真正值得优化。