2

我是 OpenMP 初学者。我遇到这样的问题。

我有一个M长度为 的掩码数组N,其元素为either 0 or 1。我希望提取所有i满足的索引M[i]=1并将它们存储到一个新的数组T中。

这个问题可以通过 OpenMP 加速吗?

我试过以下代码。但这不是性能有效的。

int count = 0;
#pragma omp parallel for
for(int i = 0; i < N; ++i) {
    if(M[i] == hashtag) {
        int pos = 0;
        #pragma omp critical (c1)
        pos = count++;
        T[pos] = i;
}
4

3 回答 3

4

我不是 100% 肯定这会好得多,但您可以尝试以下方法:

int count = 0;
#pragma omp parallel for
for(int i = 0; i < N; ++i) {
    if(M[i]) {
        #pragma omp atomic
        T[count++] = i;
    }
}

如果数组非常稀疏,线程将能够在不等待其他线程的情况下通过大量零。但是您一次只能更新一个索引。问题实际上是不同的线程正在写入同一个内存块(T),这意味着您将遇到缓存问题:每次一个线程写入时T,所有其他内核的缓存都是“脏”的 - 所以当尝试修改它,很多洗牌在幕后进行。所有这一切对你来说都是透明的(你不需要编写代码来处理它)但它会显着减慢速度 - 我怀疑这是你真正的瓶颈。如果您的矩阵足够大,值得您花时间,您可以尝试执行以下操作:

  1. 创建与线程一样多的数组 T
  2. 让每个线程更新自己的 T 版本
  3. 循环完成后将所有 T 数组合并为一个

它可能会更快(因为不同的线程不会写入相同的内存) - 但由于循环内的语句很少,我怀疑它不会。

编辑我创建了一个完整的测试程序,发现了两件事。首先,该atomic指令并非在所有版本中都有效omp,您甚至可能不得不使用T[count++] += i;它来编译(这没关系,因为 T 最初可以设置为全零);更麻烦的是,如果你这样做,你将不会得到两次相同的答案(count从一次到下一次的变化的最终值);如果您使用critical,则不会发生这种情况。

第二个观察结果是,当您增加线程数时,程序的速度确实会变慢,这证实了我对共享内存的怀疑(处理 10M 元素的时间:

threads  elapsed
  1        0.09s
  2        0.73s
  3        1.21s
  4        1.62s
  5        2.34s

你可以通过改变稀疏矩阵的方式看到这是真的M——当我创建M一个随机数组并测试M[i] < 0.01 * RAND_MAX(0.1% 密集矩阵)时,事情运行得比我让它 10% 密集时快得多——表明里面的部分该critical部分确实让我们放慢了速度。

在这种情况下,我认为没有办法在 OMP 中加速这项任务 - 最后将所有线程的输出合并到一个列表中的工作只会消耗你可能获得的任何速度优势考虑到内部循环内部发生的事情很少。因此,我建议您尽可能高效地重写循环,而不是使用多个线程 - 例如:

for( i = 0; i < N; i++) {
  T[count] = i;
  count += M[i];
}

在我的快速基准测试中,这比 OMP 解决方案更快 - 与该threads = 1解决方案相当。再次 - 这是因为这里访问内存的方式。请注意,我避免使用if语句 - 这会使代码尽可能快。相反,我利用了M[i]始终为零或一的事实。在循环结束时,您必须丢弃元素T[count],因为它将无效......“好元素”是T[0] ... T[count-1]. M在我的机器上,这个循环在 ~ 0.02 秒内处理了一个包含 10M 元素的数组。应该足以满足大多数目的吗?

于 2013-08-07T14:16:15.653 回答
2

基于 Floris 的快速功能,我尝试查看是否可以找到一种方法来使用 OpenMP 找到更快的解决方案。我想出了两个函数foo_v2foo_v3它们对于较大的数组foo_v2更快,与密度无关foo_v3更快,并且对于更稀疏的数组更快。该函数foo_v2本质上创建了一个具有宽度的二维数组N*nthreads以及一个countsa包含每个线程的计数的数组。用代码更好地解释这一点。下面的代码将遍历所有写入 T 的元素。

for(int ithread=0; ithread<nthreads; ithread++) {
    for(int i=0; i<counta[ithread]; i++) {
        T[ithread*N/nthread + i]
    } 
}

该函数foo_v3根据要求创建一维数组。在所有情况下N都必须非常大才能克服 OpenMP 开销。下面的代码默认为 256MB,密度M约为 10%。在我的 4 核 Sandy Bridge 系统上,OpenMP 函数的速度都快了 2 倍以上。如果将密度设置为 50% foo_v2,则速度仍会提高大约 2 倍,但foo_v3不再更快。

#include <stdio.h>
#include <stdlib.h>
#include <omp.h>

int foo_v1(int *M, int *T, const int N) {   
    int count = 0;
    for(int i = 0; i<N; i++) {
        T[count] = i;
        count += M[i];
    }
    return count;
}

int foo_v2(int *M, int *T, int *&counta, const int N) {
    int nthreads;
    #pragma omp parallel
    {
        nthreads = omp_get_num_threads();
        const int ithread = omp_get_thread_num();   
        #pragma omp single
        counta = new int[nthreads];
        int count_private = 0;      
        #pragma omp for
        for(int i = 0; i<N; i++) {
            T[ithread*N/nthreads + count_private] = i;
            count_private += M[i];
        }
        counta[ithread] = count_private;
    }
    return nthreads;
}

int foo_v3(int *M, int *T, const int N) {
    int count = 0;

    int *counta = 0;    
    #pragma omp parallel reduction(+:count)
    {
        const int nthreads = omp_get_num_threads();
        const int ithread = omp_get_thread_num();   
        #pragma omp single
        {
            counta = new int[nthreads+1];
            counta[0] = 0;
        }
        int *Tprivate = new int[N/nthreads];

        int count_private = 0;      
        #pragma omp for nowait
        for(int i = 0; i<N; i++) {
            Tprivate[count_private] = i;
            count_private += M[i];
        }
        counta[ithread+1] = count_private;
        count += count_private;

        #pragma omp barrier
        int offset = 0;
        for(int i=0; i<(ithread+1); i++) {
            offset += counta[i];
        }

        for(int i=0; i<count_private; i++) {
            T[offset + i] = Tprivate[i];
        }

        delete[] Tprivate;
    }
    delete[] counta;
    return count;
}

void compare(const int *T1, const int *T2, const int N, const int count, const int *counta, const int nthreads) {
    int diff = 0;
    int n = 0;
    for(int ithread=0; ithread<nthreads; ithread++) {
        for(int i=0; i<counta[ithread]; i++) {
            int i2 = N*ithread/nthreads+i;
            //printf("%d %d\n", T1[n], T2[i2]);
            int tmp = T1[n++] - T2[i2];
            if(tmp<0) tmp*=-1;
            diff += tmp;
        }
    }
    printf("diff %d\n", diff);
}

void compare_v2(const int *T1, const int *T2, const int count) {
    int diff = 0;
    int n = 0;
    for(int i=0; i<count; i++) {
        int tmp = T1[i] - T2[i];
        //if(tmp!=0) printf("%i %d %d\n", i, T1[i], T2[i]);
        if(tmp<0) tmp*=-1;
        diff += tmp;
    }
    printf("diff %d\n", diff);
}

int main() {
    const int N = 1 << 26;

    printf("%f MB\n", 4.0*N/1024/1024);
    int *M = new int[N];
    int *T1 = new int[N];
    int *T2 = new int[N];
    int *T3 = new int[N];

    int *counta;
    double dtime;
    for(int i=0; i<N; i++) {
        M[i] = ((rand()%10)==0);
    }

    //int repeat = 10000;
    int repeat = 1;
    int count1, count2;
    int nthreads;
    dtime = omp_get_wtime();
    for(int i=0; i<repeat; i++) count1 = foo_v1(M, T1, N);
    dtime = omp_get_wtime() - dtime;
    printf("time v1 %f\n", dtime);

    dtime = omp_get_wtime();
    for(int i=0; i<repeat; i++) nthreads = foo_v2(M, T2, counta, N);
    dtime = omp_get_wtime() - dtime;
    printf("time v2 %f\n", dtime);
    compare(T1, T2, N, count1, counta, nthreads);

    dtime = omp_get_wtime();
    for(int i=0; i<repeat; i++) count2 = foo_v3(M, T3, N);
    dtime = omp_get_wtime() - dtime;
    printf("time v2 %f\n", dtime);
    printf("count1 %d, count2 %d\n", count1, count2);
    compare_v2(T1, T3, count1); 

}
于 2013-08-16T10:00:32.997 回答
1

The critical operation should be atomic instead of critical; actually in your case you have to use the atomic capture clause:

int pos, count = 0;                     // pos declared outside the loop
#pragma omp parallel for private(pos)   // and privatized, count is implicitly
for(int i = 0; i < N; ++i) {            // shared by all the threads
    if(M[i]) {
        #pragma omp atomic capture
            pos = count++;
        T[pos] = i;
    }
}

Take a look at this answer to have an overview over all the possible possibilities of atomic operations with OpenMP.

于 2013-08-08T14:13:34.180 回答