10

我有两个数组,a 和 b,我想计算“最小卷积”以产生结果 c。简单的伪代码如下所示:

for i = 0 to size(a)+size(b)
    c[i] = inf
    for j = 0 to size(a)
        if (i - j >= 0) and (i - j < size(b))
            c[i] = min(c[i], a[j] + b[i-j])

(编辑:将循环更改为从 0 而不是 1 开始)

如果 min 是一个和,我们可以使用快速傅立叶变换 (FFT),但在 min 的情况下,没有这样的模拟。相反,我想通过使用 GPU (CUDA) 使这个简单的算法尽可能快。我很乐意找到执行此操作的现有代码(或在没有 FFT 的情况下实现 sum case 的代码,以便我可以根据我的目的对其进行调整),但到目前为止我的搜索还没有找到任何好的结果。我的用例将涉及大小在 1,000 到 100,000 之间的 a 和 b。

问题:

  • 是否已经存在有效执行此操作的代码?

  • 如果我要自己实现这个,在结构上,CUDA 内核应该如何看起来才能最大限度地提高效率?我尝试了一个简单的解决方案,其中每个 c[i] 都由一个单独的线程计算,但这似乎不是最好的方法。关于如何设置线程块结构和内存访问模式的任何提示?

4

3 回答 3

5

更快的版本:

__global__ void convAgB(double *a, double *b, double *c, int sa, int sb)
{
    int i = (threadIdx.x + blockIdx.x * blockDim.x);
    int idT = threadIdx.x;
    int out,j;

    __shared__ double c_local [512];

    c_local[idT] = c[i];

    out = (i > sa) ? sa : i + 1;
    j   = (i > sb) ? i - sb + 1 : 1;

    for(; j < out; j++)
    {    
       if(c_local[idT] > a[j] + b[i-j])
          c_local[idT] = a[j] + b[i-j]; 
    }   

    c[i] = c_local[idT];
} 

**Benckmark:**
Size A Size B Size C Time (s)
1000   1000   2000   0.0008
10k    10k    20k    0.0051
100k   100k   200k   0.3436
1M     1M     1M     43,327

旧版本,对于 1000 到 100000 之间的大小,我用这个简单的版本进行了测试:

__global__ void convAgB(double *a, double *b, double *c, int sa, int sb)
{
    int size = sa+sb;

    int idT = (threadIdx.x + blockIdx.x * blockDim.x);
    int out,j;


    for(int i = idT; i < size; i += blockDim.x * gridDim.x)
    {
        if(i > sa) out = sa;
        else out = i + 1;

        if(i > sb) j = i - sb + 1;
        else j = 1;


        for(; j < out; j++)
        {
                if(c[i] > a[j] + b[i-j])
                    c[i] = a[j] + b[i-j];
        }
    }
}

我填充了数组ab使用了一些随机双数和c999999(仅用于测试)。我使用您的函数(没有任何修改)验证了c数组(在 CPU 中)。

我还从内部循环中删除了条件,所以它只会测试一次。

我不是 100% 确定,但我认为以下修改是有道理的。既然你有i - j >= 0,这与 相同i >= j,这意味着一旦j > i它永远不会进入这个块'X'(从 j++ 开始):

if(c[i] > a[j] + b[i-j])
   c[i] = a[j] + b[i-j];

所以我在变量上计算out了循环条件 if i > sa,这意味着循环将在什么时候完成j == sa,如果这意味着循环将因为条件而i < sa完成(更早)。i + 1i >= j

另一个条件i - j < size(b)意味着您将开始执行块'X',当i > size(b) + 1因为j开始总是= 1。所以我们可以j输入应该开始的值,因此

if(i > sb) j = i - sb + 1;
else j = 1;

看看你是否可以用真实的数据数组测试这个版本,并给我反馈。此外,欢迎任何改进。

编辑:可以实施新的优化,但这并没有太大的区别。

if(c[i] > a[j] + b[i-j])
    c[i] = a[j] + b[i-j];

我们可以通过以下方式消除 if:

double add;
...

 for(; j < out; j++)
 {
   add = a[j] + b[i-j];
   c[i] = (c[i] < add) * c[i] + (add <= c[i]) * add;
 }

有:

if(a > b) c = b; 
else c = a; 

与 c = (a < b) * a + (b <= a) * b 相同。

如果 a > b 则 c = 0 * a + 1 * b;=> c = b; 如果 a <= b 那么 c = 1*a + 0 *b; => c = a;

**Benckmark:**
Size A Size B Size C Time (s)
1000   1000   2000   0.0013
10k    10k    20k    0.0051
100k   100k   200k   0.4436
1M     1M     1M     47,327

我正在测量从 CPU 复制到 GPU、运行内核以及从 GPU 复制到 CPU 的时间。

GPU Specifications   
Device                       Tesla C2050
CUDA Capability Major/Minor  2.0
Global Memory                2687 MB
Cores                        448 CUDA Cores
Warp size                    32
于 2012-11-06T00:48:23.393 回答
5

a另一种可能对大型有用的替代方法b是在. 使用块允许内存合并,这对于内存带宽限制操作很重要,并且可以使用相当有效的共享内存减少将每个线程的部分结果组合成最终的每个块结果。可能最好的策略是为每个 MP 启动尽可能多的块同时运行,并让每个块发出多个输出点。这消除了与启动和退出具有相对较低总指令计数的许多块相关的一些调度开销。c

如何做到这一点的一个例子:

#include <math.h>

template<int bsz>
__global__ __launch_bounds__(512)
void minconv(const float *a, int sizea, const float *b, int sizeb, float *c)
{
    __shared__ volatile float buff[bsz];
    for(int i = blockIdx.x; i<(sizea + sizeb); i+=(gridDim.x*blockDim.x)) {
        float cval = INFINITY;
        for(int j=threadIdx.x; j<sizea; j+= blockDim.x) {
            int t = i - j;
            if ((t>=0) && (t<sizeb))
                cval = min(cval, a[j] + b[t]);
        }
        buff[threadIdx.x] = cval; __syncthreads();
        if (bsz > 256) {
            if (threadIdx.x < 256) 
                buff[threadIdx.x] = min(buff[threadIdx.x], buff[threadIdx.x+256]);
            __syncthreads();
        }
        if (bsz > 128) {
            if (threadIdx.x < 128) 
                buff[threadIdx.x] = min(buff[threadIdx.x], buff[threadIdx.x+128]); 
            __syncthreads();
        }
        if (bsz > 64) {
            if (threadIdx.x < 64) 
                buff[threadIdx.x] = min(buff[threadIdx.x], buff[threadIdx.x+64]);
            __syncthreads();
        }
        if (threadIdx.x < 32) {
            buff[threadIdx.x] = min(buff[threadIdx.x], buff[threadIdx.x+32]);
            buff[threadIdx.x] = min(buff[threadIdx.x], buff[threadIdx.x+16]);
            buff[threadIdx.x] = min(buff[threadIdx.x], buff[threadIdx.x+8]);
            buff[threadIdx.x] = min(buff[threadIdx.x], buff[threadIdx.x+4]);
            buff[threadIdx.x] = min(buff[threadIdx.x], buff[threadIdx.x+2]);
            buff[threadIdx.x] = min(buff[threadIdx.x], buff[threadIdx.x+1]);
            if (threadIdx.x == 0) c[i] = buff[0];
        }
    }
}

// Instances for all valid block sizes.
template __global__ void minconv<64>(const float *, int, const float *, int, float *);
template __global__ void minconv<128>(const float *, int, const float *, int, float *);
template __global__ void minconv<256>(const float *, int, const float *, int, float *);
template __global__ void minconv<512>(const float *, int, const float *, int, float *);

[免责声明:未经测试或基准测试,使用风险自负]

这是单精度浮点,但同样的想法应该适用于双精度浮点。对于整数,您需要将 C99INFINITY宏替换为类似的东西INT_MAXor LONG_MAX,但原理保持不变。

于 2012-11-06T22:26:21.123 回答
2

我用过你的算法。我想它会帮助你。

const int Length=1000;

__global__ void OneD(float *Ad,float *Bd,float *Cd){
    int i=blockIdx.x;
    int j=threadIdx.x;
    Cd[i]=99999.99;
    for(int k=0;k<Length/500;k++){
        while(((i-j)>=0)&&(i-j<Length)&&Cd[i+k*Length]>Ad[j+k*Length]+Bd[i-j]){
            Cd[i+k*Length]=Ad[j+k*Length]+Bd[i-j];
    }}}

我已经采取了每个块的500线程。并且,每个网格块。因为,我的设备中每个块的线程数限制为,所以我使用了线程。我将所有数组的大小设为(=1000)。500512500Length

在职的:

  1. i存储块索引并j 存储线程索引。

  2. for线程数小于数组大小时使用循环。

  3. while 循环用于迭代Cd[n]

  4. 我没有使用共享内存,因为我占用了很多块和线程。因此,每个块所需的共享内存量很低。

PS:如果您的设备支持更多线程和块,请替换k<Length/500k<Length/(supported number of threads)

于 2012-11-07T18:58:43.933 回答