8

我正在使用 OpenCL 来查找两组 3D 点之间的最近邻居。

最近邻:对于数据集中的每个点(x,y,z),我必须找到模型中最近的一个。平方距离 = (Ax-Bx)^2 + (Ay-By)^2 + (Az-Bz)^2

这是我到目前为止所做的:

struct point {
int x;
int y;
int z;
};

__kernel void 
nearest_neighbour(__global struct point *model,
__global struct point *dataset,
__global int *nearest,
const unsigned int model_size)
{
    int g_dataset_id = get_global_id(0);

    int dmin = -1;
    int d, dx, dy, dz;

    for (int i=0; i<model_size; ++i) {
        dx = model[i].x - dataset[g_dataset_id].x;
        dx = dx * dx;

        dy = model[i].y - dataset[g_dataset_id].y;
        dy = dy * dy;

        dz = model[i].z - dataset[g_dataset_id].z;
        dz = dz * dz;

        d = dx + dy + dz;

        if(dmin == -1 || d < dmin)
        {
            nearest[g_dataset_id] = i;
            dmin = d;
        }
    }
}

代码似乎可以工作,但我确信它可以被优化。我想知道如何利用本地内存使其变得更好。

谢谢

PS 我知道还有其他(更好的)方法可以找到最近的邻居,比如 kd-tree,但现在我想做简单的方法。

4

7 回答 7

4

编译器可能会为您提升这些循环不变量,但为了确保它完成,请尝试将它们分配给命名的临时变量等的这段代码datum_x。此外,将 dmin 初始化MAX_INT为可以避免与-1. 另一种方法是展开第一个循环迭代(使用i=0)以初始化 dmin。

int dmin = MAX_INT;
int d, dx, dy, dz;
int datum_x, datum_y, datum_z;

datum_x = dataset[g_model_id].x;
datum_y = dataset[g_model_id].y;
datum_z = dataset[g_model_id].z;

for (int i=0; i<size_dataset; ++i) {
    dx = model[i].x - datum_x;
    dx = dx * dx;

    dy = model[i].y - datum_y;
    dy = dy * dy;

    dz = model[i].z - datum_z;
    dz = dz * dz;

    d = dx + dy + dz;

    if(d < dmin)
    {
        nearest[g_dataset_id] = i;
        dmin = d;
    }
}
于 2011-03-21T17:51:03.413 回答
2

也许一个快速的预过滤器可以加快速度。您可以先检查所有三个坐标中的距离是否都小于 dmin,而不是立即计算平方距离。因此,您可以将内部循环替换为

{
    dx = model[i].x - datum_x;
    if (abs(dx) >= dmin) continue;

    dy = model[i].y - datum_y;
    if (abs(dy) >= dmin) continue;

    dz = model[i].z - datum_z;
    if (abs(dz) >= dmin) continue;

    dx = dx * dx;    
    dy = dy * dy;
    dz = dz * dz;

    d = dx + dy + dz;

    if(d < dmin)
    {
        nearest[g_dataset_id] = i;
        dmin = d;
    }
}

我不确定对每个点的额外调用是否abs()if过滤掉足够的数据点,但我认为这是一个足够简单的更改来尝试。

于 2011-03-22T02:06:23.200 回答
1

对于您“我想知道如何利用本地内存使其变得更好”的具体问题。

使用 GPU 的本地内存可能会很棘手。在处理它之前,您需要花一些时间阅读 SDK 的代码示例和编程指南。

基本上,您使用本地内存来缓存一些全局数据块——在你的例子中是 model[] 数组——这样你可以从那里读取它比从全局读取它更快。如果您确实想尝试一下,它会像这样的伪代码:

For each block of the model array {
    1) Read data from __global and write it to __local
    2) Barrier
    3) For each model datum in the __local cache,
       Read it and process it.
    4) Barrier
}

步骤 3 基本上是您现在拥有的循环,除了它只会处理模型数据的一部分而不是整个事情。

无论何时使用本地内存,第 2 步和第 4 步都是绝对必要的。您必须同步工作组中的所有主题。屏障强制所有工作项完成屏障之前的代码,然后才允许它们中的任何一个继续执行屏障之后的代码。这可以防止工作项在其他线程将数据写入本地内存之前从本地内存中读取数据。我不记得屏障指令的语法,但它们在 OpenCL 文档中。

第 1 步,您将让每个工作项从全局读取不同的数据并将其写入本地缓存。

像这样的东西(注意这是过于简单且未经测试的!):

__local float4 modelcache[CACHESIZE];
int me = get_local_id(0);

for (int j = 0; j < model_size; j += CACHESIZE) {
    modelcache[me] = dataset[j+me];
    barrier(CLK_LOCAL_MEM_FENCE);
    for (int i=0; i < CACHESIZE; ++i) {
        d_xyz = modelcache[i] - local_xyz;
        ... etc.
    }
    barrier(CLK_LOCAL_MEM_FENCE);
}

那么设计问题是:本地缓存应该有多大?工作组的规模是多少?

本地数据存储在工作组中的工作项之间共享。如果您的 ND 工作项数组并行执行多个工作组,则每个工作组都有自己的模型缓存副本。

如果您将本地数据数组设置得太小,则使用它们会获得很少或没有任何好处。如果你把它们弄得太大,那么 GPU 就不能并行执行尽可能多的工作组,而且你实际上可能运行得相当慢。

最后,我不得不说,这种特殊的算法不太可能从本地内存缓存中受益。在您的程序中,所有工作项都在同时读取相同的 model[i] 位置,并且大多数 GPU 都具有经过专门优化的硬件,可以快速执行此操作。

于 2011-04-03T22:39:05.887 回答
1

在 Eric Bainville 提出建议后,我试图摆脱点结构。正如我所建议的那样,我使用了 float4,这就是我所做的:

__kernel void 
nearest_neighbour(__global float4 *model,
__global float4 *dataset,
__global unsigned int *nearest,
const unsigned int model_size)
{
    int g_dataset_id = get_global_id(0);

    float dmin = MAXFLOAT;
    float d;

    /* Ottimizzato per memoria locale */
    float4 local_xyz = dataset[g_dataset_id];
    float4 d_xyz;
    int imin;

    for (int i=0; i<model_size; ++i) {
        d_xyz = model[i] - local_xyz;

        d_xyz *= d_xyz;

        d = d_xyz.x + d_xyz.y + d_xyz.z;

        if(d < dmin)
        {
            imin = i;
            dmin = d;
        }
    }

    nearest[g_dataset_id] = imin; // Write only once in global memory
}

问题是这个版本比基于点结构的版本运行得慢一些。可能是因为在结构一中我使用了预过滤器:

dx = model[i].x - local_x;
dx = dx * dx;
if (dx >= dmin) continue;

dy = model[i].y - local_y;
dy = dy * dy;
if (dy >= dmin) continue;

dz = model[i].z - local_z;
dz = dz * dz;
if (dz >= dmin) continue;

d = dx + dy + dz;

我不能使用那个预过滤器宽度 float4 版本。您认为我可以对 float4 版本进行其他优化吗?

谢谢大家的宝贵建议

于 2011-03-22T20:54:19.107 回答
1

Heath 的建议也可以应用于输出索引:维护一个变量nearest_id,并且在循环之后只写一次。

我会尝试使用 int4 向量,而不是 3 个组件结构,并使用向量操作。

于 2011-03-22T10:49:10.350 回答
1

我首先想到的是希思提出的建议。每个工作项model[i]同时访问内存项。根据编译器的优化程度,最好让每个工作项访问数组中的不同元素。一种错开它的方法是:

int datum_x, datum_y, datum_z;

datum_x = dataset[g_model_id].x;
datum_y = dataset[g_model_id].y;
datum_z = dataset[g_model_id].z;

for (int i=0; i<size_dataset; ++i) {
    j = (i + g_model_id) % size_dataset;  // i --> j by cyclic permutation

    dx = model[j].x - datum_x;
    dx = dx * x;

    dy = model[j].y - datum_y;
    dy = dy * dy;

    /* and so on... */
}

但是,您的代码中的访问很可能model[i]是作为“广播”处理的,在这种情况下,我的代码运行速度会变慢。

于 2011-03-21T21:17:07.470 回答
1

我很确定花费大量时间将当前最小值写入nearest[g_dataset_id]. 访问全局内存通常很慢,因此最好将当前最小值存储在寄存器中,就像使用dmin = d.

像这样:

...
int dmin = MAX_INT;
int imin = 0;
...
for (...)
{
  ...
  if(d < dmin)
  {
    imin = i;
    dmin = d;
  }
}

nearest[g_dataset_id] = imin; //write to global memory only once
于 2011-03-22T11:14:05.923 回答