1

我从 Cormen 的著名文本中实现了一个并行合并排序算法。我使用 pthreads 用 C 语言编写它,并在 Win7 x64 上使用 MinGW 编译(后来也在 Ubuntu 中使用 GCC 进行了测试,结果相同)。我在并行化方面的第一种方法是幼稚的……我在每个递归级别都生成了一个新线程(这实际上是 Cormen 的伪代码所暗示的)。然而,这通常会导致耗时过长或由于分段错误而崩溃(我可以假设系统可以处理多少线程有一些硬性限制)。这似乎是递归并行化的常见新手错误,实际上我发现了一个类似的DISCUSSION在这个网站上。所以我改为在该线程中使用建议,即为问题大小设置阈值,如果为生成新线程的函数提供了一个小于阈值的集合(比如 10,000 个元素),那么它只是直接对元素进行操作,而不是为这么小的一组创建一个新线程。

现在一切似乎都运行良好。我在下面列出了我的一些结果。N 是问题大小(一组整数 [1, 2, 3, ..., N] 彻底打乱),阈值是我的并行排序和并行合并函数拒绝生成新线程的值。第一个表以毫秒为单位显示排序时间,第二个表显示在每种情况下产生了多少排序/合并工作线程。查看底部表格中的 N=1E6 和 N=1E7 行,您可以看到每当我降低阈值以允许超过 ~8000 个合并工作人员时,我都会遇到分段错误。同样,我认为这是由于系统对线程的某些限制,我很乐意听到更多关于这一点的信息,但这不是我的主要问题。

主要问题是,为什么在尝试使用相当高的阈值时最后一行会出现段错误,这会产生预期的 15/33 个工作线程(遵循前一行的模式)。当然,这不是我的系统要处理的太多线程。完成的一个实例(表中右下角的单元格)使用了大约 1.2GB RAM(我的系统有 6GB),与每行右侧有 0 个线程的版本相比,线程版本似乎从未占用更多 RAM。

  • 我认为我没有达到任何类型的堆限制......大量可用的 RAM,即使允许它产生 15/33 线程,它也应该只需要 ~1GB。
  • 我也不认为这是一个堆栈问题。我将程序设计为使用最小堆栈,并且我认为每个线程的占用空间根本与问题大小 N 无关,仅与堆有关。我对此非常缺乏经验......但是我在gdb中做了一个核心转储堆栈回溯,并且从堆栈顶部到底部的地址似乎足够接近以排除那里的溢出。
  • 我尝试读取 pthread_create 的返回值...在 Windows 中我在崩溃前几次得到了 11 的值(但它似乎没有触发崩溃,因为有几个 11,然后是几个 0,即没有错误,然后是另一个 11)。该错误代码是EAGAIN,资源不可用......但我不确定它在这里的真正含义。此外,在 Ubuntu 中,错误代码每次都是 0,甚至直到崩溃。
  • 我尝试了 Valgrind 并收到了很多关于内存泄漏的消息,但我不确定这些是否合法,因为我知道 Valgrind 需要额外的资源,而且我能够在没有 Valgrind 的情况下正常工作的其他问题集大小上得到这些类型的错误。

很明显,它与问题大小和系统资源有关……我希望我缺少一些常识,可以使答案非常清楚。

有任何想法吗?对不起,长长的文字墙……谢谢你读到这里!如果看起来相关,我可以发布来源。

在此处输入图像描述

编辑:添加源以供参考:

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>

const int               N = 100000000;
const int  SORT_THRESHOLD = 10000000;
const int MERGE_THRESHOLD = 10000000;

int  sort_thread_count = 0;
int merge_thread_count = 0;

typedef struct s_pmergesort_args {
    int *vals_in, p, r, *vals_out, s;
} pmergesort_args;

typedef struct s_pmerge_args {
    int *temp, p1, r1, p2, r2, *vals_out, p3;
} pmerge_args;

void *p_merge_sort(void *v_pmsa);
void *p_merge(void *v_pma);
int binary_search(int val, int *temp, int p, int r);

int main() {
    int *values, i, rand1, rand2, temp, *sorted;
    long long rand1a, rand1b, rand2a, rand2b;
    struct timeval start, end;

    /* allocate values on heap and initialize */
    values = malloc(N * sizeof(int));
    sorted = malloc(N * sizeof(int));
    for (i = 0; i < N; i++) {
        values[i] = i + 1;
        sorted[i] = 0;
    }

    /* scramble
     *  - complicated logic to maximize swapping
     *  - lots of testing (not shown) was done to verify optimal swapping */
    srand(time(NULL));
    for (i = 0; i < N/10; i++) {
        rand1a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand1b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand1 = (int)((rand1a * rand1b + rand()) % N);
        rand2a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand2b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand2 = (int)((rand2a * rand2b + rand()) % N);
        temp = values[rand1];
        values[rand1] = values[rand2];
        values[rand2] = temp;
    }

    /* set up args for p_merge_sort */
    pmergesort_args pmsa;
    pmsa.vals_in = values;
    pmsa.p = 0;
    pmsa.r = N-1;
    pmsa.vals_out = sorted;
    pmsa.s = 0;

    /* sort */
    gettimeofday(&start, NULL);
    p_merge_sort(&pmsa);
    gettimeofday(&end, NULL);

    /* verify sorting */
    for (i = 1; i < N; i++) {
        if (sorted[i] < sorted[i-1]) {
            fprintf(stderr, "Error: array is not sorted.\n");
            exit(0);
        }
    }
    printf("Success: array is sorted.\n");
    printf("Sorting took %dms.\n", (int)(((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec))/1000));

    free(values);
    free(sorted);

    printf("(  sort threads created: %d )\n", sort_thread_count);
    printf("( merge threads created: %d )\n", merge_thread_count);

    return 0;
}

void *p_merge_sort(void *v_pmsa) {
    pmergesort_args pmsa = *((pmergesort_args *) v_pmsa);
    int *vals_in = pmsa.vals_in;
    int p = pmsa.p;
    int r = pmsa.r;
    int *vals_out = pmsa.vals_out;
    int s = pmsa.s;

    int n = r - p + 1;
    pthread_t worker;

    if (n > SORT_THRESHOLD) {
        sort_thread_count++;
    }

    if (n == 1) {
        vals_out[s] = vals_in[p];
    } else {
        int *temp = malloc(n * sizeof(int));
        int q = (p + r) / 2;
        int q_ = q - p + 1;

        pmergesort_args pmsa_l;
        pmsa_l.vals_in = vals_in;
        pmsa_l.p = p;
        pmsa_l.r = q;
        pmsa_l.vals_out = temp;
        pmsa_l.s = 0;

        pmergesort_args pmsa_r;
        pmsa_r.vals_in = vals_in;
        pmsa_r.p = q+1;
        pmsa_r.r = r;
        pmsa_r.vals_out = temp;
        pmsa_r.s = q_;

        if (n > SORT_THRESHOLD) {
            pthread_create(&worker, NULL, p_merge_sort, &pmsa_l);
        } else {
            p_merge_sort(&pmsa_l);
        }
        p_merge_sort(&pmsa_r);

        if (n > SORT_THRESHOLD) {
            pthread_join(worker, NULL);
        }

        pmerge_args pma;
        pma.temp = temp;
        pma.p1 = 0;
        pma.r1 = q_ - 1;
        pma.p2 = q_;
        pma.r2 = n - 1;
        pma.vals_out = vals_out;
        pma.p3 = s;
        p_merge(&pma);
        free(temp);
    }
}

void *p_merge(void *v_pma) {
    pmerge_args pma = *((pmerge_args *) v_pma);
    int *temp = pma.temp;
    int p1 = pma.p1;
    int r1 = pma.r1;
    int p2 = pma.p2;
    int r2 = pma.r2;
    int *vals_out = pma.vals_out;
    int p3 = pma.p3;

    int n1 = r1 - p1 + 1;
    int n2 = r2 - p2 + 1;
    int q1, q2, q3, t;
    pthread_t worker;

    if (n1 < n2) {
        t = p1; p1 = p2; p2 = t;
        t = r1; r1 = r2; r2 = t;
        t = n1; n1 = n2; n2 = t;
    }
    if (n1 > MERGE_THRESHOLD) {
        merge_thread_count++;
    }

    if (n1 == 0) {
        return;
    } else {

        q1 = (p1 + r1) / 2;
        q2 = binary_search(temp[q1], temp, p2, r2);
        q3 = p3 + (q1 - p1) + (q2 - p2);
        vals_out[q3] = temp[q1];

        pmerge_args pma_l;
        pma_l.temp = temp;
        pma_l.p1 = p1;
        pma_l.r1 = q1-1;
        pma_l.p2 = p2;
        pma_l.r2 = q2-1;
        pma_l.vals_out = vals_out;
        pma_l.p3 = p3;

        if (n1 > MERGE_THRESHOLD) {
            pthread_create(&worker, NULL, p_merge, &pma_l);
        } else {        
            p_merge(&pma_l);
        }        

        pmerge_args pma_r;
        pma_r.temp = temp;
        pma_r.p1 = q1+1;
        pma_r.r1 = r1;
        pma_r.p2 = q2;
        pma_r.r2 = r2;
        pma_r.vals_out = vals_out;
        pma_r.p3 = q3+1;

        p_merge(&pma_r);

        if (n1 > MERGE_THRESHOLD) {
            pthread_join(worker, NULL);
        }
    }
}

int binary_search(int val, int *temp, int p, int r) {
    int low = p;
    int mid;
    int high = (p > r+1)? p : r+1;

    while (low < high) {
        mid = (low + high) / 2;
        if (val <= temp[mid]) {
            high = mid;
        } else {
            low = mid + 1;
        }
    }
    return high;
}

编辑 2:在下面添加了新图像,显示了每个版本使用的“最大”和“总”RAM(最大表示最高同时分配/使用,总表示整个程序生命周期中所有分配请求的总和)。这些表明在 N=1E8 和 threshold=1E7 的情况下,我应该获得 3.2GB 的最大使用量(我的系统应该能够支持)。但同样......我猜它与 pthread 库中的其他一些限制有关......不是我实际的系统资源。

在此处输入图像描述

4

3 回答 3

3

看起来它的内存不足了。在您的示例中,如果代码按顺序运行,则它一次分配的最大内存为 1.6GB。使用线程时,它使用超过 3GB。我在 malloc/free 函数周围放置了一些包装器,得到了以下结果:

Allocation of 12500000 bytes failed with 3074995884 bytes already allocated.

很容易看出,线程化时内存使用量会更多。在这种情况下,它将同时对整个数组的左侧和右侧进行排序,并分配两个大的临时缓冲区来执行此操作。当顺序运行时,左半部分的临时缓冲区将在对右半部分排序之前被释放。

这是我使用的包装器:

static size_t total_allocated = 0;
static size_t max_allocated = 0;
static pthread_mutex_t total_allocated_mutex;

static void *allocate(int n)
{
  void *result = 0;
  pthread_mutex_lock(&total_allocated_mutex);
  result = malloc(n);
  if (!result) {
    fprintf(stderr,"Allocation of %d bytes failed with %u bytes already allocated\n",n,total_allocated);
  }
  assert(result);
  total_allocated += n;
  if (total_allocated>max_allocated) {
    max_allocated = total_allocated;
  }
  pthread_mutex_unlock(&total_allocated_mutex);
  return result;
}


static void *deallocate(void *p,int n)
{
  pthread_mutex_lock(&total_allocated_mutex);
  total_allocated -= n;
  free(p);
  pthread_mutex_unlock(&total_allocated_mutex);
}
于 2012-07-08T05:07:31.243 回答
2

我运行它并得到:

Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 7120.0x14dc]
0x004017df in p_merge (v_pma=0x7882c120) at t.c:177
177             vals_out[q3] = temp[q1];
(gdb) p q3
$1 = 58
(gdb) p vals_out
$2 = (int *) 0x0
(gdb) 

这是一个 NULL 指针取消引用。我会在您分配后断言temp以确保分配成功。

    int *temp = malloc(n * sizeof(int));
    assert(temp);

稍微分析一下您的算法,似乎您在递归下降时预先分配了进行合并所需的内存。您可能需要考虑更改算法以在实际执行合并时进行分配。

但是,如果我没记错的话,合并排序会在任何合并发生之前在算法的最顶部分配第二个数组,然后随着递归调用展开,随着合并运行时间的延长,它们会在两个数组之间来回翻转。这样,malloc整个算法中只有一个调用。除了使用更少的内存之外,它的性能也会更好。

我在修改代码以使用分配在算法顶部的单个分配的临时数组时的 SWAG 如下所示。

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>

const int               N = 100000000;
const int  SORT_THRESHOLD = 10000000;
const int MERGE_THRESHOLD = 10000000;

int  sort_thread_count = 0;
int merge_thread_count = 0;

typedef struct s_pmergesort_args {
    int *vals_in, p, r, *vals_out, s, *temp;
} pmergesort_args;

typedef struct s_pmerge_args {
    int *temp, p1, r1, p2, r2, *vals_out, p3;
} pmerge_args;

void *p_merge_sort(void *v_pmsa);
void *p_merge(void *v_pma);
int binary_search(int val, int *temp, int p, int r);

int main() {
    int *values, i, rand1, rand2, temp, *sorted, *scratch;
    long long rand1a, rand1b, rand2a, rand2b;
    struct timeval start, end;

    /* allocate values on heap and initialize */
    values = malloc(N * sizeof(int));
    sorted = malloc(N * sizeof(int));
    scratch = malloc(N * sizeof(int));
    for (i = 0; i < N; i++) {
        values[i] = i + 1;
        sorted[i] = 0;
    }

    /* scramble
     *  - complicated logic to maximize swapping
     *  - lots of testing (not shown) was done to verify optimal swapping */
    srand(time(NULL));
    for (i = 0; i < N/10; i++) {
        rand1a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand1b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand1 = (int)((rand1a * rand1b + rand()) % N);
        rand2a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand2b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand2 = (int)((rand2a * rand2b + rand()) % N);
        temp = values[rand1];
        values[rand1] = values[rand2];
        values[rand2] = temp;
    }

    /* set up args for p_merge_sort */
    pmergesort_args pmsa;
    pmsa.vals_in = values;
    pmsa.p = 0;
    pmsa.r = N-1;
    pmsa.vals_out = sorted;
    pmsa.s = 0;
    pmsa.temp = scratch;

    /* sort */
    gettimeofday(&start, NULL);
    p_merge_sort(&pmsa);
    gettimeofday(&end, NULL);

    /* verify sorting */
    for (i = 1; i < N; i++) {
        if (sorted[i] < sorted[i-1]) {
            fprintf(stderr, "Error: array is not sorted.\n");
            exit(0);
        }
    }
    printf("Success: array is sorted.\n");
    printf("Sorting took %dms.\n", (int)(((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec))/1000));

    free(values);
    free(sorted);
    free(scratch);

    printf("(  sort threads created: %d )\n", sort_thread_count);
    printf("( merge threads created: %d )\n", merge_thread_count);

    return 0;
}

void *p_merge_sort(void *v_pmsa) {
    pmergesort_args pmsa = *((pmergesort_args *) v_pmsa);
    int *vals_in = pmsa.vals_in;
    int p = pmsa.p;
    int r = pmsa.r;
    int *vals_out = pmsa.vals_out;
    int s = pmsa.s;
    int *scratch = pmsa.temp;

    int n = r - p + 1;
    pthread_t worker;

    if (n > SORT_THRESHOLD) {
        sort_thread_count++;
    }

    if (n == 1) {
        vals_out[s] = vals_in[p];
    } else {
        int q = (p + r) / 2;
        int q_ = q - p + 1;

        pmergesort_args pmsa_l;
        pmsa_l.vals_in = vals_in;
        pmsa_l.p = p;
        pmsa_l.r = q;
        pmsa_l.vals_out = scratch;
        pmsa_l.s = p;
        pmsa_l.temp = vals_out;

        pmergesort_args pmsa_r;
        pmsa_r.vals_in = vals_in;
        pmsa_r.p = q+1;
        pmsa_r.r = r;
        pmsa_r.vals_out = scratch;
        pmsa_r.s = q+1;
        pmsa_r.temp = vals_out;

        if (n > SORT_THRESHOLD) {
            pthread_create(&worker, NULL, p_merge_sort, &pmsa_l);
        } else {
            p_merge_sort(&pmsa_l);
        }
        p_merge_sort(&pmsa_r);

        if (n > SORT_THRESHOLD) {
            pthread_join(worker, NULL);
        }

        pmerge_args pma;
        pma.temp = scratch + p;
        pma.p1 = 0;
        pma.r1 = q_ - 1;
        pma.p2 = q_;
        pma.r2 = n - 1;
        pma.vals_out = vals_out + p;
        pma.p3 = s - p;
        p_merge(&pma);
    }
}

void *p_merge(void *v_pma) {
    pmerge_args pma = *((pmerge_args *) v_pma);
    int *temp = pma.temp;
    int p1 = pma.p1;
    int r1 = pma.r1;
    int p2 = pma.p2;
    int r2 = pma.r2;
    int *vals_out = pma.vals_out;
    int p3 = pma.p3;

    int n1 = r1 - p1 + 1;
    int n2 = r2 - p2 + 1;
    int q1, q2, q3, t;
    pthread_t worker;

    if (n1 < n2) {
        t = p1; p1 = p2; p2 = t;
        t = r1; r1 = r2; r2 = t;
        t = n1; n1 = n2; n2 = t;
    }
    if (n1 > MERGE_THRESHOLD) {
        merge_thread_count++;
    }

    if (n1 == 0) {
        return;
    } else {

        q1 = (p1 + r1) / 2;
        q2 = binary_search(temp[q1], temp, p2, r2);
        q3 = p3 + (q1 - p1) + (q2 - p2);
        vals_out[q3] = temp[q1];

        pmerge_args pma_l;
        pma_l.temp = temp;
        pma_l.p1 = p1;
        pma_l.r1 = q1-1;
        pma_l.p2 = p2;
        pma_l.r2 = q2-1;
        pma_l.vals_out = vals_out;
        pma_l.p3 = p3;

        if (n1 > MERGE_THRESHOLD) {
            pthread_create(&worker, NULL, p_merge, &pma_l);
        } else {
            p_merge(&pma_l);
        }

        pmerge_args pma_r;
        pma_r.temp = temp;
        pma_r.p1 = q1+1;
        pma_r.r1 = r1;
        pma_r.p2 = q2;
        pma_r.r2 = r2;
        pma_r.vals_out = vals_out;
        pma_r.p3 = q3+1;

        p_merge(&pma_r);

        if (n1 > MERGE_THRESHOLD) {
            pthread_join(worker, NULL);
        }
    }
}

int binary_search(int val, int *temp, int p, int r) {
    int low = p;
    int mid;
    int high = (p > r+1)? p : r+1;

    while (low < high) {
        mid = (low + high) / 2;
        if (val <= temp[mid]) {
            high = mid;
        } else {
            low = mid + 1;
        }
    }
    return high;
}
于 2012-07-08T04:28:11.203 回答
0

您对系统的压力太大了,因为加速实现的并行化没有多大意义。并行化会产生成本,当你用这样的线程淹没系统时,你的系统作为一个整体必须做很多工作,线程不是免费的。

特别是对于如果您要求太多线程时程序崩溃的“问题”,这完全是您的错:阅读pthread_create. 它声明该函数返回一个值,并且这样做是有原因的。

为了获得加速(我想这是您正在寻找的),您不能期望获得比系统中的物理内核更多的东西。有时线程比内核多一点(比如两倍)是件好事,但很快线程创建的开销就会远远超过您所能获得的。

然后,归并排序是一种算法,通常受对 RAM 的访问限制,而不是比较。RAM 访问(即使像在合并排序中那样进行流式处理)也比 CPU 慢几个数量级。此外,您的内存总线不是并行设备,您在内存访问中唯一的并行性是缓存(如果它们是的话)。将内存占用量扩大两倍,可能会扼杀所有性能提升。在您的代码中,您甚至可以通过在各个线程调用的下方分配内存来使情况变得更糟,因为分配内存本身是有成本的,因此系统必须协调这些分配。

为了让它重新开始,首先编写一个具有良好内存处理和访问模式的递归归并排序算法。只在递归的顶部节点分配一些大缓冲区,并将部分缓冲区交给递归调用。

创建一个单独的合并例程,将两个已排序的缓冲区合并为第三个。对其进行基准测试,计算算法花费的每个排序项的微秒。根据你的 CPU 的速度计算你每个排序项目浪费的周期数。阅读编译器为合并生成的汇编程序,如果您发现它看起来太复杂,请尝试找出如何改进它。

之后,开始为您的递归函数添加并行性。

于 2012-07-08T07:05:15.913 回答