0

我有一个数据结构哈希表,它具有线性探测哈希方案,并被设计为与 CAS 无锁。

哈希表

constexpr uint64_t HASH_EMPTY = 0xffffffffffffffff;

struct OnceLock {

    static const unsigned LOCK_FRESH   = 0;
    static const unsigned LOCK_WORKING = 1;
    static const unsigned LOCK_DONE    = 2;

    volatile unsigned lock;

    __device__ void init() {
        lock = LOCK_FRESH;
    } 

    __device__ bool enter() {
        unsigned lockState = atomicCAS ( (unsigned*) &lock, LOCK_FRESH, LOCK_WORKING );
        return lockState == LOCK_FRESH;
    }

    __device__ void done() {
        __threadfence();
        lock = LOCK_DONE;
        __threadfence();
    }

    __device__ void wait() {
        while ( lock != LOCK_DONE );
    }
};
template <typename T>
struct agg_ht {
    OnceLock lock;
    uint64_t hash;
    T payload;
};


template <typename T>
__global__ void initAggHT ( agg_ht<T>* ht, int32_t num ) {
    for (int i = blockIdx.x * blockDim.x + threadIdx.x; i < num; i += blockDim.x * gridDim.x) {
    ht[i].lock.init();
    ht[i].hash = HASH_EMPTY;
    }
}


// returns candidate bucket
template <typename T>
__device__ int hashAggregateGetBucket ( agg_ht<T>* ht, int32_t ht_size, uint64_t grouphash, int& numLookups, T* payl ) {
    int location=-1;
    bool done=false;
    while ( !done ) {
        location = ( grouphash + numLookups ) % ht_size;
        agg_ht<T>& entry = ht [ location ];
        numLookups++;
        if ( entry.lock.enter() ) {
            entry.payload = *payl;
            entry.hash = grouphash;
            entry.lock.done();
        }
        entry.lock.wait();
        done = (entry.hash == grouphash);
        if ( numLookups == ht_size ) {
            printf ( "agg_ht hash table full at threadIdx %d & blockIdx %d \n", threadIdx.x, blockIdx.x );
            break;
        }
    }
    return location;
}

然后我有一个最小的内核以及主函数,只是为了让哈希表运行。重要的是哈希表用 注释__shared__,它被分配在 SM 的共享内存中,用于快速访问。(我没有添加任何输入数据cudaMalloc以保持示例最小化。)

#include <cstdint>
#include <cstdio>

/**hash table implementation**/

constexpr int HT_SIZE = 1024;

__global__ void kernel() { 
    __shared__ agg_ht<int> aht2[HT_SIZE]; 
    {
        int ht_index;
        unsigned loopVar = threadIdx.x;
        unsigned step = blockDim.x;
        while(loopVar < HT_SIZE) {
            ht_index = loopVar;
            aht2[ht_index].lock.init();
            aht2[ht_index].hash = HASH_EMPTY;
            loopVar += step;
        }
    }

    int key = 1;
    int value = threadIdx.x;

    __syncthreads();

    int bucket = -1;
    int bucketFound = 0;
    int numLookups = 0;
    while(!(bucketFound)) {
        bucket = hashAggregateGetBucket ( aht2, HT_SIZE, key, numLookups, &(value));
        int probepayl = aht2[bucket].payload;
        bucketFound = 1;
        bucketFound &= ((value == probepayl));
    }
}

int main() {
    kernel<<<1, 128>>>();
    cudaDeviceSynchronize();
    return 0;
}

编译它的标准方法,如果文件被调用test.cu$ nvcc -G test.cu -o test

我不得不说,这个哈希表在大量输入下的并发插入期间总是会给我正确的答案。但是,当我运行racecheck它时,我到处都看到错误:

$ compute-sanitizer --tool racecheck ./test
========= COMPUTE-SANITIZER
========= Error: Race reported between Write access at 0xd20 in /tmp/test.cu:61:int hashAggregateGetBucket<int>(agg_ht<T1> *, int, unsigned long, int &, T1 *)
=========     and Read access at 0xe50 in /tmp/test.cu:65:int hashAggregateGetBucket<int>(agg_ht<T1> *, int, unsigned long, int &, T1 *) [1016 hazards]
========= 
========= Error: Race reported between Write access at 0x180 in /tmp/test.cu:25:OnceLock::done()
=========     and Read access at 0xd0 in /tmp/test.cu:30:OnceLock::wait() [992 hazards]
========= 
========= Error: Race reported between Write access at 0xcb0 in /tmp/test.cu:60:int hashAggregateGetBucket<int>(agg_ht<T1> *, int, unsigned long, int &, T1 *)
=========     and Read access at 0x1070 in /tmp/test.cu:103:kernel() [508 hazards]
========= 
========= RACECHECK SUMMARY: 3 hazards displayed (3 errors, 0 warnings)

我很困惑,我相信这个线性探测哈希表可以通过我的单元测试,但到处都有数据竞争的危险。我想这些危险与正确性无关。(?)

经过一段时间的调试,我仍然无法消除危险错误。我坚信这volatile是原因。我希望有人能够对此有所了解,并帮助我解决那些烦人的危险。

我也希望这个问题可以反映有关该主题的一些设计思想:共享内存上的数据结构。在 StackOverflow 上搜索时,我看到的只是共享内存中的普通原始数组。

4

1 回答 1

1

我想这些危险与正确性无关。(?)

我不会尝试证明您的应用程序或算法的“正确性”。如果那是您正在寻找的东西,请忽略我的回答。

我希望有人能够阐明它

当一个线程写入共享内存中的某个位置,而另一个线程从该位置读取,并且代码中没有干预同步以确保写入发生在读取之前(或者更准确地说,写入的值对读取线程可见)。这不是一个仔细、详尽的定义,但足以满足我们在此处理的内容。

就该定义而言,您的代码中肯定有该活动。一个被标记的具体案例是一个线程写在这里:

        entry.hash = grouphash;

另一个线程在这里读取相同的位置:

    done = (entry.hash == grouphash);

__syncthreads()检查您的代码,我们可以看到这两个代码位置之间没有语句。此外,由于包含该活动的循环,与此相关的危害不止一种(有两种)。

另一个被标记的交互是一个线程写到lock这里:

        entry.lock.done();

另一个线程在这里读取相同的lock位置:

    entry.lock.wait();

这里报告的危险实际上是针对其他代码行报告的,因为它们都是函数调用。同样,没有干预同步。

我承认,由于您的应用程序的循环性质,我不确定这些线程到线程通信路径中的任何一个是否需要尽早获得“正确性”。但是,我没有仔细研究过您的申请,也不打算说明任何有关正确性的内容。

帮我解决那些恼人的危险。

碰巧的是,这两种交互都在代码的一小部分中,因此根据我的测试,我们可以通过以下添加来消除这 3 个危害:

    __syncthreads();  // add this line
    entry.lock.wait();
    done = (entry.hash == grouphash);
    __syncthreads();  // add this line

第一个同步与我已经指出的行之间明显的读写连接相交。由于此时代码的循环性质,需要第二次同步。

另请注意,正确使用线程块中__syncthreads()的所有线程都可以到达该同步点。快速浏览您在此处所拥有的内容并没有向我暗示需要仔细处理上述行/添加,但您应该确认这一点并注意一般应用程序/用法。可能是while bucketFound循环会在这里产生一种情况,应该以不同的方式处理,但是compute-sanitizer --tool synccheck没有报告任何问题,在 V100 上运行,并添加了我在此处建议的添加。

于 2021-12-15T17:39:39.343 回答