2

根据文档,该cufftSetStream()功能

将 CUDA 流与 cuFFT 计划相关联。现在,在计划执行期间进行的所有内核启动都通过关联的流完成 [...直到...] 流被另一个 cufftSetStream() 调用更改。

不幸的是,结果变成了垃圾。这是一个示例,通过两种方式执行一堆转换来演示这一点:一次是每个流都有自己的专用计划,一次是重复使用单个计划,如上面的文档所示。前者的行为符合预期,重用/cufftSetStream 方法在大多数转换中都有错误。我在 CentOS 7 linux 上使用 Cuda 编译工具、7.0 版、V7.0.27 版试用过的两张卡(GTX 750 ti、Titan X)上观察到了这一点;并发布 7.5、V7.5.17。

编辑:有关解决问题的一种方法,请参阅下面的“修复”评论。

#include <cufft.h>
#include <stdexcept>
#include <iostream>
#include <numeric>
#include <vector>

#define ck(cmd) if ( cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);}


__global__
void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y)
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x)
            buf[i*stride + j] = make_cuFloatComplex( (i+seed)%101 - 50,(j+seed)%41-20);
}

__global__
void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) {
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) {
            float e=buf1[i*stride+j] - buf2[i*stride+j];
            if (e*e > 1) // gross error
                atomicAdd(errors,1);
        }
    }
}

void demo(bool reuse_plan)
{
    if (reuse_plan)
        std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... ";
    else
        std::cout << "Giving each stream its own dedicated fft plan ... ";
    int nfft = 1024;
    int batch = 1024;
    int nstreams = 8;
    int nbins = nfft/2+1;
    int nit=100;
    size_t inpitch,outpitch;

    std::vector<cufftComplex*> inbufs(nstreams);
    std::vector<float*> outbufs(nstreams);
    std::vector<float*> checkbufs(nstreams);
    std::vector<cudaStream_t> streams(nstreams);
    std::vector<cufftHandle> plans(nstreams);
    for (int i=0;i<nstreams;++i) {
        ck( cudaStreamCreate(&streams[i]));
        ck( cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch) );
        ck( cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch));
        ck( cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch) );
        if (i==0 || reuse_plan==false)
            ck ( cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch) );
    }

    // fill the input buffers and FFT them to get a baseline for comparison
    for (int i=0;i<nstreams;++i) {
        fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i);
        ck (cudaGetLastError());
        if (reuse_plan) {
            ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i]));
        }else{
            ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i]));
            ck( cufftSetStream(plans[i],streams[i]) ); // only need to set the stream once
        }
        ck( cudaDeviceSynchronize());
    }
    // allocate a buffer for the error count
    int * errors;
    cudaMallocHost((void**)&errors,sizeof(int)*nit);
    memset(errors,0,sizeof(int)*nit);

    /* FIX: an event can protect the plan internal buffers 
    by serializing access to the plan
    cudaEvent_t ev;
    cudaEventCreateWithFlags(&ev,cudaEventDisableTiming);
    */

    // perform the FFTs and check the outputs on streams
    for (int it=0;it<nit;++it) {
        int k = it % nstreams;
        ck( cudaStreamSynchronize(streams[k]) ); // make sure any prior kernels have completed
        if (reuse_plan) {
            // FIX: ck(cudaStreamWaitEvent(streams[k],ev,0 ) );
            ck(cufftSetStream(plans[0],streams[k]));
            ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k]));
            // FIX: ck(cudaEventRecord(ev,streams[k] ) );
        }else{
            ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k]));
        }
        check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]);
        ck (cudaGetLastError());
    }
    ck(cudaDeviceSynchronize());

    // report number of errors
    int errcount=0;
    for (int it=0;it<nit;++it)
        if (errors[it])
            ++errcount;
    std::cout << errcount << " of " << nit << " transforms had errors\n";

    for (int i=0;i<nstreams;++i) {
        cudaFree(inbufs[i]);
        cudaFree(outbufs[i]);
        cudaStreamDestroy(streams[i]);
        if (i==0 || reuse_plan==false)
            cufftDestroy(plans[i]);
    }
}

int main(int argc,char ** argv)
{
    demo(false);
    demo(true);
    return 0;
}

典型输出

为每个流提供自己专用的 fft 计划 ... 100 个转换中的 0 个有错误
通过 cufftSetStream 对多个流重用相同的 fft 计划 ... 100 个转换中有 87 个有错误

4

1 回答 1

3

为了以您希望的方式重用计划,您需要手动管理 cuFFT 工作区。

每个计划都有一个用于中间计算结果的空间。如果您想同时为两个或多个不同的计划执行使用计划句柄,您需要为每个并发 cufftExec* 调用提供临时缓冲区。

您可以使用 cufftSetWorkArea 来执行此操作 - 请查看 cuFFT 文档中的第 3.7 节。第 2.2 节也有助于理解它是如何工作的。

这是一个工作示例,显示了对此代码的更改:

$ cat t1241.cu
#include <cufft.h>
#include <stdexcept>
#include <iostream>
#include <numeric>
#include <vector>

#define ck(cmd) if ( cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);}


__global__
void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y)
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x)
            buf[i*stride + j] = make_cuFloatComplex( (i+seed)%101 - 50,(j+seed)%41-20);
}

__global__
void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) {
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) {
            float e=buf1[i*stride+j] - buf2[i*stride+j];
            if (e*e > 1) // gross error
                atomicAdd(errors,1);
        }
    }
}

void demo(bool reuse_plan)
{
    if (reuse_plan)
        std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... ";
    else
        std::cout << "Giving each stream its own dedicated fft plan ... ";
    int nfft = 1024;
    int batch = 1024;
    int nstreams = 8;
    int nbins = nfft/2+1;
    int nit=100;
    size_t inpitch,outpitch;

    std::vector<cufftComplex*> inbufs(nstreams);
    std::vector<float*> outbufs(nstreams);
    std::vector<float*> checkbufs(nstreams);
    std::vector<cudaStream_t> streams(nstreams);
    std::vector<cufftHandle> plans(nstreams);
    // if plan reuse, set up independent work areas
    std::vector<char *> wk_areas(nstreams);
    for (int i=0;i<nstreams;++i) {
        ck( cudaStreamCreate(&streams[i]));
        ck( cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch) );
        ck( cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch));
        ck( cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch) );
        if (i==0 || reuse_plan==false)
            ck ( cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch) );
    }
    if (reuse_plan){
      size_t ws;
      ck(cufftGetSize(plans[0], &ws));
      for (int i = 0; i < nstreams; i++)
        ck(cudaMalloc(&(wk_areas[i]), ws));
      ck(cufftSetAutoAllocation(plans[0], 0));
      ck(cufftSetWorkArea(plans[0], wk_areas[0]));
      }
    // fill the input buffers and FFT them to get a baseline for comparison
    for (int i=0;i<nstreams;++i) {
        fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i);
        ck (cudaGetLastError());
        if (reuse_plan) {
            ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i]));
        }else{
            ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i]));
            ck( cufftSetStream(plans[i],streams[i]) ); // only need to set the stream once
        }
        ck( cudaDeviceSynchronize());
    }
    // allocate a buffer for the error count
    int * errors;
    cudaMallocHost((void**)&errors,sizeof(int)*nit);
    memset(errors,0,sizeof(int)*nit);

    // perform the FFTs and check the outputs on streams
    for (int it=0;it<nit;++it) {
        int k = it % nstreams;
        ck( cudaStreamSynchronize(streams[k]) ); // make sure any prior kernels have completed
        if (reuse_plan) {
            ck(cufftSetStream(plans[0],streams[k]));
            ck(cufftSetWorkArea(plans[0], wk_areas[k])); // update work area pointer in plan
            ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k]));
        }else{
            ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k]));
        }
        check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]);
        ck (cudaGetLastError());
    }
    ck(cudaDeviceSynchronize());

    // report number of errors
    int errcount=0;
    for (int it=0;it<nit;++it)
        if (errors[it])
            ++errcount;
    std::cout << errcount << " of " << nit << " transforms had errors\n";

    for (int i=0;i<nstreams;++i) {
        cudaFree(inbufs[i]);
        cudaFree(outbufs[i]);
        cudaFree(wk_areas[i]);
        cudaStreamDestroy(streams[i]);
        if (i==0 || reuse_plan==false)
            cufftDestroy(plans[i]);
    }
}

int main(int argc,char ** argv)
{
    demo(false);
    demo(true);
    return 0;
}
$ nvcc -o t1241 t1241.cu -lcufft
$ ./t1241
Giving each stream its own dedicated fft plan ... 0 of 100 transforms had errors
Reusing the same fft plan with multiple stream via cufftSetStream ... 0 of 100 transforms had errors
$
于 2016-08-19T21:15:44.790 回答