8

对于当前的 OpenCL GPGPU 项目,我需要根据具有 64 个可能值的某个键对数组中的元素进行排序。我需要最终的数组使所有具有相同键的元素都是连续的。new_index[old_index]将关联数组作为此任务的输出就足够了。

我把任务分成两部分。首先,我为每个可能的键(桶)计算带有这个键的元素的数量(进入那个桶)。我扫描这个数组(生成一个前缀总和),它指示每个桶的元素的新索引范围,比如每个桶的“开始”索引。

然后,第二步必须为每个元素分配一个新索引。如果我要在 CPU 上实现它,算法将是这样的:

for all elements e:
    new_index[e] = bucket_start[bucket(e)]++

当然,这在 GPU 上不起作用。每个项目都需要以bucket_start读写模式访问数组,这本质上是所有工作项目之间的同步,这是我们能做的最糟糕的事情。

一个想法是将一些计算放在工作组中。但是我不确定这应该如何准确地完成,因为我在 GPGPU 计算方面没有经验。

在全局内存中,我们使用上述前缀 sum 初始化存储桶起始数组。对这个数组的访问是用一个原子 int “互斥”的。(我是新手,所以可能会在这里混用一些词。)

每个工作组都被隐式分配了输入元素数组的一部分。它使用包含新索引的本地存储桶数组,相对于我们尚不知道的(全局)存储桶开始。在这些“本地缓冲区”之一已满后,工作组必须将本地缓冲区写入全局数组。为此,它锁定对全局存储桶起始数组的访问,将这些值增加当前本地存储桶大小,解锁,然后可以将结果写入全局new_index数组(通过添加相应的偏移量)。重复此过程,直到处理完所有分配的元素。

出现两个问题:

  1. 这是一个好方法吗?我知道从/向全局内存读取和写入很可能是这里的瓶颈,特别是因为我试图获得对全局内存(至少只有一小部分)的同步访问。但也许有更好的方法来做到这一点,也许使用内核分解。请注意,我尽量避免在内核期间将数据从 GPU 读回 CPU(以避免 OpenCL 命令队列刷新,这也很糟糕,因为我很坚强)。

  2. 在上面的算法设计中,如何实现锁定机制?下面的代码会起作用吗?特别是,当硬件在 SIMD 组中执行“真正并行”的工作项目时,我预计会出现问题,例如 Nvidia“扭曲”。在我当前的代码中,工作组的所有项目都将尝试以 SIMD 方式获取锁。我应该将此限制为仅第一个工作项吗?并使用障碍使它们在本地保持同步?

    #pragma OPENCL EXTENSION cl_khr_global_int32_base_atomics : enable
    
    __kernel void putInBuckets(__global uint *mutex,
                               __global uint *bucket_start,
                               __global uint *new_index)
    {
        __local bucket_size[NUM_BUCKETS];
        __local bucket[NUM_BUCKETS][LOCAL_MAX_BUCKET_SIZE]; // local "new_index"
    
        while (...)
        {
            // process a couple of elements locally until a local bucket is full
            ...
    
            // "lock"
            while(atomic_xchg(mutex, 1)) {
            }
    
            // "critical section"
            __local uint l_bucket_start[NUM_BUCKETS];
            for (int b = 0; b < NUM_BUCKETS; ++b) {
                l_bucket_start[b] = bucket_start[b]; // where should we write?
                bucket_start[b] += bucket_size[b];   // update global offset
            }
    
            // "unlock"
            atomic_xchg(mutex, 0);
    
            // write to global memory by adding the offset
            for (...)
                new_index[...] = ... + l_bucket_start[b];
        }
    }
    
4

3 回答 3

3

首先永远不要尝试在 GPU 上实现锁定算法。它将陷入僵局和停滞。这是因为 GPU 是一种 SIMD 设备,并且线程不像在 CPU 上那样独立执行。GPU 同步执行一组称为 WARP/WaveFront 的线程。因此,如果 Wave Front 中的一个线程停止,它会停止 Wave Front 中的所有其他线程。如果解锁线程处于停滞的波前,它将不会执行和解锁互斥锁。

原子操作没问题。

您应该考虑的是无锁方法。有关解释和示例 CUDA 代码,请参阅本文: http ://www.cse.iitk.ac.in/users/mainakc/pub/icpads2012.pdf/

它使用一些示例 CUDA 代码描述了无锁哈希表、链表和跳过列表。

建议的方法是创建一个两级数据结构。

第一级是无锁跳过列表。每个跳过列表条目都具有用于重复值的无锁链表的第二级结构。以及条目数的原子计数。

插入方法

1) 生成 64 桶密钥 2) 在跳过列表中查找密钥 3) 如果未找到,则插入到跳过列表中 4) 将数据插入到链表中 5) 增加此桶的原子计数器

插入前缀后,将跳过列表桶的所有计数器相加,以便找到输出的偏移量。

于 2013-06-12T18:25:05.803 回答
0

我找到了一种将本地缓冲区附加到全局数组的更简单的方法。它只需要两个步骤,其中一个涉及原子操作。

第一步是在全局目标数组中分配索引,每个线程将在其中写入其元素。为此,我们可以使用 in aatomic_add(__global int*)添加要附加的元素数量。bucket_start在这个具体的例子中使用这个函数。的返回值atomic_add旧值

在第二步中,我们将此返回值用作复制目标数组中本地缓冲区的基本索引。如果我们决定将整个线程组用于一个这样的追加操作,我们将“像往常一样”将本地缓冲区复制到线程组内的全局数组中。在上面的桶排序示例中,我们复制了多个数组,当数组的数量(=桶数)等于工作组大小时,我们可以改为为每个线程分配一个桶,将循环复制。

于 2014-10-27T15:51:21.277 回答
0

我最近不得不解决一个类似的问题,我找到了一个更优雅、更有效的解决方案。我以为我会分享。

一般算法如下:

1.内核1:每个元素的线程

  • 计算每个桶中的元素数量(直方图)。
  • 对于每个元素:计算每个值从桶开始的偏移量(棘手的部分)。

2.内核2:每个桶的线程

  • 直方图上的前缀总和(扫描)以计算每个桶的开始

3.内核3:每个元素的线程

  • 分散元素。

    对于输入中的每个元素 i: output[i] = prefix_sum[input[i]] + offsets[i];

棘手的部分是生成我们在第三个内核中使用的偏移量数组。

在第一个内核上,我们定义了一个包含每个工作组存储桶直方图的本地缓存。我使用 atomic_add 返回此计数器的先前值 - “当前”偏移量这一事实。这个事实是关键。

__kernel void bucket_histogram(__global uint *input,__global uint *histogram,__global uint *offsets) {

__local local_histogram[NUM_BUCKETS];

size_t local_idx = get_local_id(0);
size_t global_idx = get_global_id(0);

// zero local mem

if (local_idx < NUM_BUCKETS)
{
    local_histogram[local_idx] = 0;
}
barrier(CLK_LOCAL_MEM_FENCE);

// increment local histogram, save the local offset for later
uint value = input[global_idx];
uint local_offset = atomic_add(&local_histogram[value], 1);

barrier(CLK_LOCAL_MEM_FENCE);

// store the buckets in the global histogram (for later prefix sum)

if (local_idx < NUM_BUCKETS)
{
    uint count = local_histogram[local_idx];
    if (count > 0)
    {
        // increment the global histogram, save the offset!
        uint group_offset_for_the_value_local_idx = atomic_add(&histogram[local_idx], count);
        local_histogram[local_idx] = group_offset_for_the_value_local_idx;
    }
}

barrier(CLK_LOCAL_MEM_FENCE);

// now local_histogram changes roles, it contains the per-value group offset from the start of the bucket

offsets[global_idx] = local_offset + local_histogram[value];

第二个内核执行前缀和来计算每个桶的开始。第三个内核简单地组合了所有的偏移量:

__kernel void bucket_sort_scatter(__global uint *input, __global uint* prefix_sum_histogram, __global uint* offsets, __global data_t *output) {

size_t global_idx = get_global_id(0);
uint value = input[global_idx];
uint scatter_target = offsets[global_idx] + prefix_sum_histogram[value];
output[scatter_target] = value;
于 2020-11-15T18:21:21.143 回答