3

我正在开发一个多 GPU 加速流求解器。目前我正在尝试实现通信隐藏。这意味着,在交换数据时,GPU 会计算网格中不参与通信的部分,并在通信完成后计算网格的其余部分。

我试图通过一个流(computeStream)用于长期运行内核(fluxKernel)和一个(communicationStream)用于不同的通信阶段来解决这个问题。具有非常低的computeStream优先级,以便允许 上的内核communicationStream交错fluxKernel,即使它使用所有资源。

这些是我正在使用的流:

int priority_high, priority_low;
cudaDeviceGetStreamPriorityRange(&priority_low , &priority_high ) ;
cudaStreamCreateWithPriority (&communicationStream, cudaStreamNonBlocking, priority_high );
cudaStreamCreateWithPriority (&computeStream      , cudaStreamNonBlocking, priority_low  );

所需的并发模式如下所示:

在此处输入图像描述

在通过 MPI 发送数据之前,我需要同步communicationStream数据,以确保在发送之前完全下载数据。

在下面的清单中,我展示了我目前正在做的事情的结构。首先,我fluxKernelcomputeStream. 然后我开始sendKernel收集应该发送到第二个 GPU 的数据,然后将其下载到主机(由于硬件限制,我不能使用 cuda-aware MPI)。然后数据以非阻塞方式发送MPI_Isend,随后使用阻塞接收(MPI_recv)。当接收到数据时,该过程向后完成。首先将数据上传到设备,然后通过recvKernel. 最后对fluxKernel上的网格的剩余部分调用communicationStream

请注意,显示的代码内核在默认流上运行之前和之后。

{ ... } // Preparations

// Start main part of computatation on first stream

fluxKernel<<< ..., ..., 0, computeStream >>>( /* main Part */ );

// Prepare send data

sendKernel<<< ..., ..., 0, communicationStream >>>( ... );

cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyDeviceToHost, communicationStream );
cudaStreamSynchronize( communicationStream );

// MPI Communication

MPI_Isend( ... );
MPI_Recv ( ... );

// Use received data

cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyHostToDevice, communicationStream );

recvKernel<<< ..., ..., 0, communicationStream >>>( ... );

fluxKernel<<< ..., ..., 0, communicationStream >>>( /* remaining Part */ );

{ ... } // Rest of the Computations

我使用 nvprof 和 Visual Profiler 来查看流是否实际同时执行。这是结果:

结果与沟通

我观察到sendKernel(紫色)、上传、MPI 通信和下载与fluxKernel. 但是,recvKernel(红色)仅在另一个流完成后才开始。开启同步并不能解决问题:

在此处输入图像描述

对于我的实际应用程序,我不仅有一个通信,而且还有多个。我也通过两次通信对此进行了测试。程序是:

sendKernel<<< ..., ..., 0, communicationStream >>>( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyDeviceToHost, communicationStream );
cudaStreamSynchronize( communicationStream );
MPI_Isend( ... );

sendKernel<<< ..., ..., 0, communicationStream >>>( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyDeviceToHost, communicationStream );
cudaStreamSynchronize( communicationStream );
MPI_Isend( ... );

MPI_Recv ( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyHostToDevice, communicationStream );
recvKernel<<< ..., ..., 0, communicationStream >>>( ... );

MPI_Recv ( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyHostToDevice, communicationStream );
recvKernel<<< ..., ..., 0, communicationStream >>>( ... );

结果类似于一次通信(上图),因为第二次内核调用(这次是 a sendKernel)被延迟到内核computeStream完成。

在此处输入图像描述

因此,总体观察结果是,第二次内核调用被延迟,与这是哪个内核无关。

你能解释一下,为什么 GPU 会以这种方式同步,或者我如何才能让第二个内核communicationStream同时运行到 computeStream?

非常感谢。

编辑1:问题的完整返工


最小可重现示例

我构建了一个最小的可重现示例。最后,代码将数据绘制int到终端。正确的最后一个值是 32778 (=(32*1024-1) + 1 + 10)。一开始我添加了一个选项整数来测试 3 个不同的选项:

  • 0:在 CPU 修改数据之前同步的预期版本
  • 1:同0,但不同步
  • 2:memcpys 专用流且无同步
#include <iostream>

#include <cuda.h>
#include <cuda_runtime.h>
#include <device_launch_parameters.h>

const int option = 0;

const int numberOfEntities = 2 * 1024 * 1024;
const int smallNumberOfEntities = 32 * 1024;

__global__ void longKernel(float* dataDeviceIn, float* dataDeviceOut, int numberOfEntities)
{
    int index = blockIdx.x * blockDim.x + threadIdx.x;
    if(index >= numberOfEntities) return;

    float tmp = dataDeviceIn[index];

#pragma unroll
    for( int i = 0; i < 2000; i++ ) tmp += 1.0;

    dataDeviceOut[index] = tmp;
}

__global__ void smallKernel_1( int* smallDeviceData, int numberOfEntities )
{
    int index = blockIdx.x * blockDim.x + threadIdx.x;
    if(index >= numberOfEntities) return;

    smallDeviceData[index] = index;
}

__global__ void smallKernel_2( int* smallDeviceData, int numberOfEntities )
{
    int index = blockIdx.x * blockDim.x + threadIdx.x;
    if(index >= numberOfEntities) return;

    int value = smallDeviceData[index];

    value += 10;

    smallDeviceData[index] = value;
}


int main(int argc, char **argv)
{
    cudaSetDevice(0);

    float* dataDeviceIn;
    float* dataDeviceOut;

    cudaMalloc( &dataDeviceIn , sizeof(float) * numberOfEntities );
    cudaMalloc( &dataDeviceOut, sizeof(float) * numberOfEntities );

    int* smallDataDevice;
    int* smallDataHost;

    cudaMalloc    ( &smallDataDevice, sizeof(int) * smallNumberOfEntities );
    cudaMallocHost( &smallDataHost  , sizeof(int) * smallNumberOfEntities );

    cudaStream_t streamLong;
    cudaStream_t streamSmall;
    cudaStream_t streamCopy;

    int priority_high, priority_low;
    cudaDeviceGetStreamPriorityRange(&priority_low , &priority_high ) ;
    cudaStreamCreateWithPriority (&streamLong , cudaStreamNonBlocking, priority_low  );
    cudaStreamCreateWithPriority (&streamSmall, cudaStreamNonBlocking, priority_high );
    cudaStreamCreateWithPriority (&streamCopy , cudaStreamNonBlocking, priority_high );

    //////////////////////////////////////////////////////////////////////////

    longKernel <<< numberOfEntities / 32, 32, 0, streamLong >>> (dataDeviceIn, dataDeviceOut, numberOfEntities);

    //////////////////////////////////////////////////////////////////////////

    smallKernel_1 <<< smallNumberOfEntities / 32, 32, 0 , streamSmall >>> (smallDataDevice, smallNumberOfEntities);

    if( option <= 1 ) cudaMemcpyAsync( smallDataHost, smallDataDevice, sizeof(int) * smallNumberOfEntities, cudaMemcpyDeviceToHost, streamSmall );
    if( option == 2 ) cudaMemcpyAsync( smallDataHost, smallDataDevice, sizeof(int) * smallNumberOfEntities, cudaMemcpyDeviceToHost, streamCopy  );

    if( option == 0 ) cudaStreamSynchronize( streamSmall );

    // some CPU modification of data
    for( int i = 0; i < smallNumberOfEntities; i++ ) smallDataHost[i] += 1;

    if( option <= 1 ) cudaMemcpyAsync( smallDataDevice, smallDataHost, sizeof(int) * smallNumberOfEntities, cudaMemcpyHostToDevice, streamSmall );
    if( option == 2 ) cudaMemcpyAsync( smallDataDevice, smallDataHost, sizeof(int) * smallNumberOfEntities, cudaMemcpyHostToDevice, streamCopy  );

    smallKernel_2 <<< smallNumberOfEntities / 32, 32, 0 , streamSmall >>> (smallDataDevice, smallNumberOfEntities);

    //////////////////////////////////////////////////////////////////////////

    cudaDeviceSynchronize();

    cudaMemcpy( smallDataHost, smallDataDevice, sizeof(int) * smallNumberOfEntities, cudaMemcpyDeviceToHost );

    for( int i = 0; i < smallNumberOfEntities; i++ ) std::cout << smallDataHost[i] << "\n";

    return 0;
}

使用代码,我看到与上述相同的行为:

选项 0(正确结果): 在此处输入图像描述

选项 1(错误的结果,缺少 CPU 的 +1): 在此处输入图像描述

选项2(完全错误的结果,全部10,之前下载smallKernel_1在此处输入图像描述


解决方案:

在 Linux 下运行选项 0(根据 Roberts answere 中的建议)会带来预期的行为! 在此处输入图像描述

4

1 回答 1

3

以下是我将如何尝试实现这一点。

  1. 按照您的建议使用高优先级/低优先级流安排。
  2. 应该只需要 2 个流
  3. 确保固定主机内存以允许计算/复制重叠
  4. 由于您不打算使用支持 cuda 的 MPI,因此您的 MPI 事务纯粹是主机活动。因此,我们可以使用流回调将这个主机活动插入到高优先级流中。
  5. 为了让高优先级的内核能够轻松地插入到低优先级的内核中,我为高优先级的复制内核选择了 grid-stride-loop 的设计策略,而为低优先级的内核选择了 non-grid-stride-loop 的设计策略。我们希望低优先级内核有更多的块,以便块一直在启动和退出,轻松地允许 GPU 块调度程序在高优先级块可用时插入它们。
  6. 每个“帧”的工作发布不使用任何类型的同步调用。我cudaDeviceSynchronize()每循环/帧使用一次,以中断(分离)一帧与下一帧的处理。框架内活动的安排完全由 CUDA 流语义处理,以强制执行相互依赖的活动的序列化,但允许不依赖的活动并发。

这是实现这些想法的示例代码:

#include <iostream>
#include <unistd.h>
#include <cstdio>

#define cudaCheckErrors(msg) \
    do { \
        cudaError_t __err = cudaGetLastError(); \
        if (__err != cudaSuccess) { \
            fprintf(stderr, "Fatal error: %s (%s at %s:%d)\n", \
                msg, cudaGetErrorString(__err), \
                __FILE__, __LINE__); \
            fprintf(stderr, "*** FAILED - ABORTING\n"); \
            exit(1); \
        } \
    } while (0)

typedef double mt;
const int nTPB = 512;
const size_t ds = 100ULL*1048576;
const size_t bs = 1048576ULL;
const int  my_intensity = 1;
const int loops = 4;
const size_t host_func_delay_us = 100;
const int max_blocks = 320; // chosen based on GPU, could use runtime calls to set this via cudaGetDeviceProperties

template <typename T>
__global__ void fluxKernel(T * __restrict__ d, const size_t n, const int intensity){

  size_t idx = ((size_t)blockDim.x) * blockIdx.x + threadIdx.x;
  if (idx < n){
    T temp = d[idx];
    for (int i = 0; i < intensity; i++)
      temp = sin(temp);  // just some dummy code to simulate "real work"
    d[idx] = temp;
    }
}

template <typename T>
__global__ void sendKernel(const T * __restrict__ d, const size_t n, T * __restrict__ b){

  for (size_t idx = ((size_t)blockDim.x) * blockIdx.x + threadIdx.x; idx < n; idx += ((size_t)blockDim.x)*gridDim.x)
    b[idx] = d[idx];
}

template <typename T>
__global__ void recvKernel(const T * __restrict__ b, const size_t n, T * __restrict__ d){

  for (size_t idx = ((size_t)blockDim.x) * blockIdx.x + threadIdx.x; idx < n; idx += ((size_t)blockDim.x)*gridDim.x)
    d[idx] = b[idx];
}

void CUDART_CB MyCallback(cudaStream_t stream, cudaError_t status, void *data){
    printf("Loop %lu callback\n", (size_t)data);
    usleep(host_func_delay_us); // simulate: this is where non-cuda-aware MPI calls would go, operating on h_buf
}
int main(){

  // get the range of stream priorities for this device
  int priority_high, priority_low;
  cudaDeviceGetStreamPriorityRange(&priority_low, &priority_high);
  // create streams with highest and lowest available priorities
  cudaStream_t st_high, st_low;
  cudaStreamCreateWithPriority(&st_high, cudaStreamNonBlocking, priority_high);
  cudaStreamCreateWithPriority(&st_low, cudaStreamNonBlocking, priority_low);
  // allocations
  mt *h_buf, *d_buf, *d_data;
  cudaMalloc(&d_data, ds*sizeof(d_data[0]));
  cudaMalloc(&d_buf, bs*sizeof(d_buf[0]));
  cudaHostAlloc(&h_buf, bs*sizeof(h_buf[0]), cudaHostAllocDefault);
  cudaCheckErrors("setup error");
  // main processing loop
  for (unsigned long i = 0; i < loops; i++){
    // issue low-priority
    fluxKernel<<<((ds-bs)+nTPB)/nTPB, nTPB,0,st_low>>>(d_data+bs, ds-bs, my_intensity);
    // issue high-priority
    sendKernel<<<max_blocks,nTPB,0,st_high>>>(d_data, bs, d_buf);
    cudaMemcpyAsync(h_buf, d_buf, bs*sizeof(h_buf[0]), cudaMemcpyDeviceToHost, st_high);
    cudaStreamAddCallback(st_high, MyCallback, (void*)i, 0);
    cudaMemcpyAsync(d_buf, h_buf, bs*sizeof(h_buf[0]), cudaMemcpyHostToDevice, st_high);
    recvKernel<<<max_blocks,nTPB,0,st_high>>>(d_buf, bs, d_data);
    fluxKernel<<<((bs)+nTPB)/nTPB, nTPB,0,st_high>>>(d_data, bs, my_intensity);
    cudaDeviceSynchronize();
    cudaCheckErrors("loop error");
    }
  return 0;
}

这是视觉分析器时间线输出(在 linux 上,Tesla V100):

视觉分析器时间线

请注意,在 Windows WDDM 上安排复杂的并发场景可能非常具有挑战性。我建议避免这种情况,这个答案并不打算讨论那里的所有挑战。我建议使用 linux 或 Windows TCC GPU 来执行此操作。

如果您在您的机器上尝试此代码,您可能需要调整一些不同的常量以使事情看起来像这样。

于 2019-07-17T15:01:57.040 回答