与所有指令一样,原子指令是按 Warp 调度的。然而,有一个与原子相关的未指定流水线,并且不能保证通过流水线的调度指令流对于每个线程、流水线的每个阶段都以锁步执行。这为您的观察提供了可能性。
我相信一个简单的思想实验将证明这一定是真的:如果同一条经线中的 2 个线程针对同一位置怎么办?显然,处理的每个方面都无法同步进行。我们可以将这个思想实验扩展到我们在一个 SM 内甚至跨 SM 中每个时钟有多个问题的情况,作为附加示例。
如果向量长度足够短(16 字节或更少),那么只需让线程中的线程写入适当的向量类型数量(例如int4
. 只要所有线程(无论它们在网格中的哪个位置)都在尝试更新一个自然对齐的位置,写入不应被其他写入破坏。
但是,在评论中讨论之后,似乎 OP 的目标是能够让经线或线程块更新一定长度的向量,而不受其他经线或线程块的干扰。在我看来,真正需要的是访问控制(以便一次只有一个扭曲或线程块正在更新特定的向量),并且 OP 有一些代码没有按预期工作。
可以使用普通的原子操作(atomicCAS
在下面的示例中)强制执行此访问控制,以允许一次只允许一个“生产者”更新向量。
下面是一个示例生产者-消费者代码,其中有多个线程块正在更新一系列向量。每个向量“slot”都有一个“slot control”变量,它被原子更新以指示:
- 矢量为空
- 矢量正在填充
- 矢量已填充,准备“消费”
通过这个 3 级方案,我们可以允许消费者和多个生产者工作人员对向量进行普通访问,并使用一个普通的原子变量访问机制。这是一个示例代码:
#include <assert.h>
#include <iostream>
#include <stdio.h>
const int num_slots = 256;
const int slot_length = 32;
const int max_act = 65536;
const int slot_full = 2;
const int slot_filling = 1;
const int slot_empty = 0;
const int max_sm = 64; // needs to be greater than the maximum number of SMs for any GPU that it will be run on
__device__ int slot_control[num_slots] = {0};
__device__ int slots[num_slots*slot_length];
__device__ int observations[max_sm] = {0}; // reported by consumer
__device__ int actives[max_sm] = {0}; // reported by producers
__device__ int correct = 0;
__device__ int block_id = 0;
__device__ volatile int restricted_sm = -1;
__device__ int num_act = 0;
static __device__ __inline__ int __mysmid(){
int smid;
asm volatile("mov.u32 %0, %%smid;" : "=r"(smid));
return smid;}
// this code won't work on a GPU with a single SM!
__global__ void kernel(){
__shared__ volatile int done, update, next_slot;
int my_block_id = atomicAdd(&block_id, 1);
int my_sm = __mysmid();
if (my_block_id == 0){
if (!threadIdx.x){
restricted_sm = my_sm;
__threadfence();
// I am "block 0" and process the vectors, checking for coherency
// "consumer"
next_slot = 0;
volatile int *vslot_control = slot_control;
volatile int *vslots = slots;
int scount = 0;
while(scount < max_act){
if (vslot_control[next_slot] == slot_full){
scount++;
int slot_val = vslots[next_slot*slot_length];
for (int i = 1; i < slot_length; i++) if (slot_val != vslots[next_slot*slot_length+i]) { assert(0); /* badness - incoherence */}
observations[slot_val]++;
vslot_control[next_slot] = slot_empty;
correct++;
__threadfence();
}
next_slot++;
if (next_slot >= num_slots) next_slot = 0;
}
}}
else {
// "producer"
while (restricted_sm < 0); // wait for signaling
if (my_sm == restricted_sm) return;
next_slot = 0;
done = 0;
__syncthreads();
while (!done) {
if (!threadIdx.x){
while (atomicCAS(slot_control+next_slot, slot_empty, slot_filling) > slot_empty) {
next_slot++;
if (next_slot >= num_slots) next_slot = 0;}
// we grabbed an empty slot, fill it with my_sm
if (atomicAdd(&num_act, 1) < max_act) update = 1;
else {done = 1; update = 0;}
}
__syncthreads();
if (update) slots[next_slot*slot_length+threadIdx.x] = my_sm;
__threadfence(); //enforce ordering
if ((update) && (!threadIdx.x)){
slot_control[next_slot] = 2; // mark slot full
atomicAdd(actives+my_sm, 1);}
__syncthreads();
}
}
}
int main(){
kernel<<<256, slot_length>>>();
cudaDeviceSynchronize();
cudaError_t res= cudaGetLastError();
if (res != cudaSuccess) printf("kernel failure: %d\n", (int)res);
int *h_obs = new int[max_sm];
int *h_act = new int[max_sm];
int h_correct;
cudaMemcpyFromSymbol(h_obs, observations, sizeof(int)*max_sm);
cudaMemcpyFromSymbol(h_act, actives, sizeof(int)*max_sm);
cudaMemcpyFromSymbol(&h_correct, correct, sizeof(int));
int h_total_act = 0;
int h_total_obs = 0;
for (int i = 0; i < max_sm; i++){
std::cout << h_act[i] << "," << h_obs[i] << " ";
h_total_act += h_act[i];
h_total_obs += h_obs[i];}
std::cout << std::endl << h_total_act << "," << h_total_obs << "," << h_correct << std::endl;
}
我不声称此代码对于任何用例都没有缺陷。它是先进的,用于展示概念的可操作性,而不是作为生产就绪代码。在我测试过的几个不同的系统上,它似乎对我有用。它不应该在只有一个 SM 的 GPU 上运行,因为一个 SM 是为消费者保留的,其余的 SM 由生产者使用。