9

我在这篇论文中看到了一种并行合并排序算法。这是代码:

void mergesort_parallel_omp (int a[], int size, int temp[], int threads) 
{  
    if ( threads == 1)       { mergesort_serial(a, size, temp); }
    else if (threads > 1) 
    {
         #pragma omp parallel sections
         {
             #pragma omp section
             mergesort_parallel_omp(a, size/2, temp, threads/2);
             #pragma omp section
             mergesort_parallel_omp(a + size/2, size - size/2, temp + size/2, threads - threads/2);
         }
         merge(a, size, temp); 
    } // threads > 1
}

我在多核上运行它。发生的情况是,在树的叶子上,2 个线程并行运行。在他们完成工作后,其他 2 个线程开始,依此类推。即使我们有所有叶节点的空闲核心。

我认为原因是这个 OpenMP 代码不会在并行区域内创建并行区域。我对么?

4

2 回答 2

15

我认为原因是 OpenMP 无法在并行区域内创建并行区域

你可以有一个平行区域的平行区域。

OpenMP 并行区域可以相互嵌套。如果嵌套并行被禁用,则由在并行区域内遇到并行构造的线程创建的新组 仅由遇到的线程组成。如果启用了嵌套并行,那么新团队可能包含多个线程)。

为了正确运行您的代码,您需要调用omp_set_nested(1)and omp_set_num_threads(2)

可以通过设置 OMP_NESTED 环境变量或调用 omp_set_nested() 函数来启用或禁用嵌套并行


为了获得更好的性能而不是部分,您可以使用 OpenMP 任务(可以在此处找到详细信息和示例),如下所示:

void merge(int * X, int n, int * tmp) {
   ...
} 

void mergeSort(int *X, int n, int *tmp)
{  
   if (n < 2) return;
   
   #pragma omp task shared(X) if (n > TASK_SIZE)
   mergeSort(X, n/2, tmp);
   
   #pragma omp task shared(X) if (n > TASK_SIZE)
   mergeSort(X+(n/2), n-(n/2), tmp + n/2);
   
   #pragma omp taskwait
   mergeSortAux(X, n, tmp);
}



int main()
{
   ...
   #pragma omp parallel
   {
      #pragma omp single
      mergesort(data, n, tmp);
   }
} 

合并算法的顺序代码来自 Dr. Johnnie W. Baker网页。. 但是,我在此答案中提供的代码有一些更正和性能改进。

一个完整的运行示例:

#include <assert.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <omp.h>

#define TASK_SIZE 100

unsigned int rand_interval(unsigned int min, unsigned int max)
{
    // https://stackoverflow.com/questions/2509679/
    int r;
    const unsigned int range = 1 + max - min;
    const unsigned int buckets = RAND_MAX / range;
    const unsigned int limit = buckets * range;

    do
    {
        r = rand();
    } 
    while (r >= limit);

    return min + (r / buckets);
}

void fillupRandomly (int *m, int size, unsigned int min, unsigned int max){
    for (int i = 0; i < size; i++)
    m[i] = rand_interval(min, max);
} 

void mergeSortAux(int *X, int n, int *tmp) {
   int i = 0;
   int j = n/2;
   int ti = 0;

   while (i<n/2 && j<n) {
      if (X[i] < X[j]) {
         tmp[ti] = X[i];
         ti++; i++;
      } else {
         tmp[ti] = X[j];
         ti++; j++;
      }
   }
   while (i<n/2) { /* finish up lower half */
      tmp[ti] = X[i];
      ti++; i++;
   }
   while (j<n) { /* finish up upper half */
      tmp[ti] = X[j];
      ti++; j++;
   }
   memcpy(X, tmp, n*sizeof(int));
} 

void mergeSort(int *X, int n, int *tmp)
{
   if (n < 2) return;

   #pragma omp task shared(X) if (n > TASK_SIZE)
   mergeSort(X, n/2, tmp);

   #pragma omp task shared(X) if (n > TASK_SIZE)
   mergeSort(X+(n/2), n-(n/2), tmp + n/2);

   #pragma omp taskwait
   mergeSortAux(X, n, tmp);
}

void init(int *a, int size){
   for(int i = 0; i < size; i++)
       a[i] = 0;
}

void printArray(int *a, int size){
   for(int i = 0; i < size; i++)
       printf("%d ", a[i]);
   printf("\n");
}

int isSorted(int *a, int size){
   for(int i = 0; i < size - 1; i++)
      if(a[i] > a[i + 1])
        return 0;
   return 1;
}

int main(int argc, char *argv[]) {
        srand(123456);
        int N  = (argc > 1) ? atoi(argv[1]) : 10;
        int print = (argc > 2) ? atoi(argv[2]) : 0;
        int numThreads = (argc > 3) ? atoi(argv[3]) : 2;
        int *X = malloc(N * sizeof(int));
        int *tmp = malloc(N * sizeof(int));

        omp_set_dynamic(0);              /** Explicitly disable dynamic teams **/
        omp_set_num_threads(numThreads); /** Use N threads for all parallel regions **/

         // Dealing with fail memory allocation
        if(!X || !tmp)
        { 
           if(X) free(X);
           if(tmp) free(tmp);
           return (EXIT_FAILURE);
        }

        fillupRandomly (X, N, 0, 5);

        double begin = omp_get_wtime();
        #pragma omp parallel
        {
            #pragma omp single
            mergeSort(X, N, tmp);
        }   
        double end = omp_get_wtime();
        printf("Time: %f (s) \n",end-begin);
    
        assert(1 == isSorted(X, N));

        if(print){
           printArray(X, N);
        }

        free(X);
        free(tmp);
        return (EXIT_SUCCESS);
}

4 核机器中的 had-doc 基准测试产生以下结果:

100000000 elements 
1 thread : Time: 11.052081 (s)
2 threads: Time: 5.907508  (s)
4 threads: Time: 4.984839  (s)

A overall Speed up of 2.21x

未来的改进将在GitHub 上提供。


可以在此处找到并行版本的高级 C++ 版本。最终算法如下所示:

void mergeSortRecursive(vector<double>& v, unsigned long left, unsigned long right) {
   if (left < right) {
      if (right-left >= 32) {
         unsigned long mid = (left+right)/2; 
         #pragma omp taskgroup
         {
            #pragma omp task shared(v) untied if(right-left >= (1<<14))
            mergeSortRecursive(v, left, mid);
            #pragma omp task shared(v) untied if(right-left >= (1<<14))
            mergeSortRecursive(v, mid+1, right);
            #pragma omp taskyield
         }
         inplace_merge(v.begin()+left, v.begin()+mid+1, v.begin()+right+1);
      }else{
         sort(v.begin()+left, v.begin()+right+1);
     }
    }
  }
}


void mergeSort(vector<double>& v) { 
     #pragma omp parallel
     #pragma omp single
     mergeSortRecursive(v, 0, v.size()-1); 
}

报告的加速6.61x为 48 个线程。

于 2012-12-10T23:51:52.283 回答
5

这个问题的现代答案是使用任务而不是部分。任务是在 OpenMP 3.0 (2009) 中添加的,并且比嵌套并行和部分工作得更好/更容易,因为嵌套并行会导致超额订阅(比可用 CPU 更多的活动线程),从而导致性能显着下降。对于任务,您有一组与 CPU 数量相匹配的线程,它们将处理这些任务。所以你不需要手动处理threads参数。一个简单的解决方案如下所示:

// span parallel region outside once outside
void mergesort_omp(...) {
    #pragma omp parallel
    #pragma omp single
    mergesort_parallel_omp(...)
}


void mergesort_parallel_omp (int a[], int size, int temp[]) 
{  
    #pragma omp task
    mergesort_parallel_omp(a, size/2, temp);

    mergesort_parallel_omp(a + size/2, size - size/2, temp + size/2);

    #pragma omp taskwait
    merge(a, size, temp); 
}

但是,为太小的工作块创建任务仍然可能会出现问题,因此根据工作粒度限制并行度很有用,例如:

void mergesort_parallel_omp (int a[], int size, int temp[]) 
{  
    if (size < size_threshold) {
        mergesort_serial(a, size, temp);
        return;
    }
    #pragma omp task
    mergesort_parallel_omp(a, size/2, temp);

    mergesort_parallel_omp(a + size/2, size - size/2, temp + size/2);

    #pragma omp taskwait
    merge(a, size, temp); 
}
于 2017-11-26T11:06:39.777 回答