我正在使用 CFD 代码(用于计算流体动力学)。我最近有机会看到英特尔编译器在我的一个循环中使用 SSE,在这个循环中将计算性能提高了近 2 倍。但是,使用 SSE 和 SIMD 指令似乎更像是运气。大多数时候,编译器什么都不做。
然后我试图强制使用 SSE,考虑到 AVX 指令将在不久的将来加强这一方面。
我做了一个简单的一维传热代码。它由两个阶段组成,使用另一个阶段的结果(U0 -> U1,然后是 U1 -> U0,然后是 U0 -> U1,等等)。当它迭代时,它会收敛到稳定的解决方案。我在主代码中的大部分循环都使用相同类型的计算。(有限差分)。
但是,我的代码比正常循环慢两倍。结果是一样的,所以计算是一致的。
我犯错了吗?在超级计算机(使用 Westmer)上测试之前,我使用 Core 2 来测试循环。
这是代码,带有 SSE 循环,然后是参考循环:
#include <stdio.h>
#include <emmintrin.h>
#include <time.h>
//#include <vector>
#define n1 1004
#define niter 200000
int i,j,t;
double U0[n1] __attribute__ ((aligned(16)));
double U1[n1] __attribute__ ((aligned(16)));
double Dx,Dy,Lx,Ly,InvDxDx,Dt,alpha,totaltime,Stab,DtAlpha,DxDx;
__m128d vmmx00;
__m128d vmmx01;
__m128d vmmx02;
__m128d vmmx10;
__m128d va;
__m128d vb;
__m128d vc;
__m128d vd;
clock_t time0,time1;
FILE *f1;
int main()
{
/* ---- GENERAL ---- */
alpha = 0.4;
totaltime = 1.0/100.0;
Dt = totaltime/((niter-1)*1.0);
Lx = 1.0;
Dx = Lx/((n1-1)*1.0);
InvDxDx = 1.0/(Dx*Dx);
DxDx = Dx*Dx;
Stab = alpha*Dt*(InvDxDx);
DtAlpha = Dt*alpha;
/* Stability if result <= 0.5 */
printf("Stability factor : %f \n",Stab);
for( i = 0; i < n1; i++){U0[i] = 0.0;}
U0[1] = 1.0;
U0[2] = 1.0;
U0[3] = 1.0;
U0[n1-2] = 2.0;
// for ( i = 0; i < n1; i++) {
// for ( j = i + 1; j < n2; j++) {
// std::swap(U0[i][j], U0[j][i]);
// }
//}
va = _mm_set1_pd(-2.0);
vb = _mm_set1_pd(InvDxDx);
vd = _mm_set1_pd(DtAlpha);
time0=clock();
for( t = 0; t < niter; t++)
{
for( i = 2; i < n1-2; i+=2)
{
//printf("%d %d \n",i,j);
//fflush(stdout);
vmmx00 = _mm_load_pd(&U0[i]);
vmmx01 = _mm_loadu_pd(&U0[i+1]);
vmmx02 = _mm_loadu_pd(&U0[i-1]);
vmmx10 = _mm_mul_pd(va,vmmx00); // U1[i][j] = -2.0*U0[i][j];
vmmx10 = _mm_add_pd(vmmx10,vmmx01); // U1[i][j] = U1[i][j] + U0[i+1][j];
vmmx10 = _mm_add_pd(vmmx10,vmmx02); // U1[i][j] = U1[i][j] + U0[i-1][j];
vmmx10 = _mm_mul_pd(vb,vmmx10); // U1[i][j] = U1[i][j] * InvDxDx;
vmmx10 = _mm_mul_pd(vd,vmmx10); // U1[i][j] = U1[i][j] * DtAlpha;
vmmx10 = _mm_add_pd(vmmx10,vmmx00); // U1[i][j] = U1[i][j] + U0[i][j];
_mm_store_pd(&U1[i],vmmx10);
// U1[i][j] = U0[i][j] + DtAlpha*( (U0[i+1][j]-2.0*U0[i][j]+U0[i-1][j])*InvDxDx
}
for( i = 2; i < n1-2; i+=2)
{
//printf("%d %d \n",i,j);
//fflush(stdout);
vmmx00 = _mm_load_pd(&U1[i]);
vmmx01 = _mm_loadu_pd(&U1[i+1]);
vmmx02 = _mm_loadu_pd(&U1[i-1]);
vmmx10 = _mm_mul_pd(va,vmmx00); // U0[i][j] = -2.0*U1[i][j];
vmmx10 = _mm_add_pd(vmmx10,vmmx01); // U0[i][j] = U0[i][j] + U1[i+1][j];
vmmx10 = _mm_add_pd(vmmx10,vmmx02); // U0[i][j] = U0[i][j] + U1[i-1][j];
vmmx10 = _mm_mul_pd(vb,vmmx10); // U0[i][j] = U0[i][j] * InvDxDx;
vmmx10 = _mm_mul_pd(vd,vmmx10); // U0[i][j] = U0[i][j] * DtAlpha;
vmmx10 = _mm_add_pd(vmmx10,vmmx00); // U0[i][j] = U0[i][j] + U1[i][j];
_mm_store_pd(&U0[i],vmmx10);
// U1[i][j] = U0[i][j] + DtAlpha*( (U0[i+1][j]-2.0*U0[i][j]+U0[i-1][j])*InvDxDx
}
}
time1=clock();
printf("Loop 0, total time : %f \n", (double) time1-time0);
f1 = fopen ("out0.dat", "wt");
for( i = 1; i < n1-1; i++)
{
fprintf (f1, "%d\t%f\n", i, U0[i]);
}
// REF
for( i = 0; i < n1; i++){U0[i] = 0.0;}
U0[1] = 1.0;
U0[2] = 1.0;
U0[3] = 1.0;
U0[n1-2] = 2.0;
time0=clock();
for( t = 0; t < niter; t++)
{
for( i = 2; i < n1-2; i++)
{
U1[i] = U0[i] + DtAlpha* (U0[i+1]-2.0*U0[i]+U0[i-1])*InvDxDx;
}
for( i = 2; i < n1-2; i++)
{
U0[i] = U1[i] + DtAlpha* (U1[i+1]-2.0*U1[i]+U1[i-1])*InvDxDx;
}
}
time1=clock();
printf("Loop 0, total time : %f \n", (double) time1-time0);
f1 = fopen ("outref.dat", "wt");
for( i = 1; i < n1-1; i++)
{
fprintf (f1, "%d\t%f\n", i, U0[i]);
}
}
++++++++++++++++++++++++++++++++++++++++++++++++++++++ ++++++++++
编辑 :
++++++++++++++++++++++++++++++++++++++++++++++++++++++ ++++++++++
考虑到你的回答,我找到了合适的地方来讨论这个问题,所以我将扩大话题并解释我的目标。如果您接受,我们将一个接一个地讨论所有循环。这可能很长,但对于我所在领域的很多人以及像 OpenFoam 这样的开源求解器来说,它可能非常有用。不考虑对能源消耗的影响(我们都使用大型超级计算器)。
我使用的 CFD 代码在 512 个 Westmer Core 上运行了 1 个多月。我正在使用 MPI(消息传递接口)在 procs 之间进行通信。物理场可以被视为一个网格,因此是 1D、2D 或 3D 阵列,具体取决于模拟的类型。但是 3D 是你能想象到的最好的。
完整的代码在 Fortran 95 中,实际上是 C 语言的简化。它很容易与 C 接口,并且 C 例程可以直接从 Fortran 调用,类型相同(int、double、long 等)。但是,Fortran 不允许这样的优化:它的设计很简单。这就是我研究 C 指令的原因。
在所有 CFD 代码中,我们都面临着相同的问题:3 种类型的循环和 MPI 内存分配。让我们首先讨论一下循环:
空间导数(称为有限差分):循环由所有情况下的 1D 卷积组成(1D、2D、3D,一次只能在一个轴上推导)(DF = F[i-1]*A + F[i ]*B + F[i+1]*C)。但是,当使用超过 1D 时,内存访问变为以下内容:
// x1 derivative for i 1 -> n1 for j 1 ->n2 DF_x1[i][j] = F[i-1][j]*A + F[i][j]*B + F[i+1][j]*C // x2 derivative for i 1 -> n1 for j 1 ->n2 DF_x2[i][j] = F[i][j-1]*D + F[i][j]*E + F[i][j+1]*G
在第一个循环中,内存访问不继续(Fortran 中的反转,内存被反转)。这是第一个问题。使用 3D 数组时同上。
泊松方程分辨率,即矩阵乘法:循环由 1D、2D 或 3D 卷积组成,具体取决于模拟。这实际上是二阶导数 (DDF = D(DF))。
for i 1 -> n1 for j 1 ->n2 DDF[i][j] = F[i-1][j]*A + F[i][j]*B + F[i+1][j]*C + F[i][j-1]*D + F[i][j]*E + F[i][j+1]*G
这个循环和我第一次给你的循环是一样的,但是它是直接计算的,不是偶数和奇数。
加权高斯赛德尔分辨率,即与波纹管相同的循环,但具有依赖性:
// even for i 1 -> n1 for j 1 ->n2 F1[i][j] = F0[i-1][j]*A + F0[i][j]*B + F0[i+1][j]*C + F0[i][j-1]*D + F0[i][j]*E + F0[i][j+1]*G //odd for i 1 -> n1 for j 1 ->n2 F0[i][j] = F1[i-1][j]*A + F1[i][j]*B + F1[i+1][j]*C + F1[i][j-1]*D + F1[i][j]*E + F1[i][j+1]*G
这是您之前调查的循环。
然后,我们面临另一个问题:内存分配。每个核心都有自己的内存,需要与其他人共享。让我们考虑最后一个循环,但经过简化:
for t 1 -> niter
// even
for i 1 -> n1-2
F1[i] = F0[i-1]*A + F0[i]*B + F0[i+1]*C
//odd
for i 1 -> n1-2
F0[i] = F1[i-1]*A + F1[i]*B + F1[i+1]*C
考虑 n1=512,但由于 RAM 容量低,无法将其存储在本地内存中。内存分布在 core0 (1->255) 和 core1 (256-512) 上,它们不在同一台计算机上,而是在网络上。在这种情况下,i=256 中的导数需要知道点 i=255,但是这个值在另一个 proc 上。包含其他处理器值的内存称为 GHOST 内存。所以循环是:
! update boundary memory :
Share to ghost : core0 : F0[255] -> Network -> F0[0] : core1 (don't forget that for core1, the array restart from 0)
Share to ghost : core1 : F0[1] -> Network -> F0[256] : core0 (you understand that F0[256] is the ghost for core0, and F0[0] is the ghost for core1)
// even, each core do this loop.
for i 1 -> n1-2
F1[i] = F0[i-1]*A + F0[i]*B + F0[i+1]*C
! update boundary memory :
Share to ghost : core0 : F1[255] -> Network -> F1[0] : core1
Share to ghost : core1 : F1[1] -> Network -> F1[256] : core0
//odd, each core do this loop.
for i 1 -> n1-2
F0[i] = F1[i-1]*A + F1[i]*B + F1[i+1]*C
我们需要处理这个问题。神秘的,你现在明白我要去哪里了:循环交错需要考虑到这一点。我认为这可以通过这种方式完成:
! update boundary memory :
Share to ghost : core0 : F0[255] -> Network -> F0[0] : core1
Share to ghost : core1 : F0[1] -> Network -> F0[256] : core0
for t 1 -> niter
! compute borders in advance :
core0 only : F1[255] = F0[254]*A + F0[255]*B + F0[256]*C
core1 only : F1[1] = F0[0]*A + F0[1]*B + F0[2]*C
Launch Share to ghost asynchronous : core0 : F1[255] -> Network -> F1[0] : core1
Launch Share to ghost asynchronous : core1 : F1[1] -> Network -> F1[256] : core0
During the same time (this can be done at the same time because MPI support asynchronous communications)
// even
for i 2 -> n1-3 (note the reduced domain)
F1[i] = F0[i-1]*A + F0[i]*B + F0[i+1]*C
Check that communications are done.
! compute borders in advance :
core0 only : F0[255] = F1[254]*A + F1[255]*B + F1[256]*C
core1 only : F0[1] = F1[0]*A + F1[1]*B + F1[2]*C
Launch Share to ghost asynchronous : core0 : F0[255] -> Network -> F0[0] : core1
Launch Share to ghost asynchronous : core1 : F0[1] -> Network -> F0[256] : core0
//odd, each core do this loop.
for i 2 -> n1-3
F0[i] = F1[i-1]*A + F1[i]*B + F1[i+1]*C
Check that communications are done.
希望我没有在某处犯错索引。让我们暂时考虑第一种类型的循环,这是最简单的,然后我们可以通过循环 2 和 3,它们是相似的。目的是这样做(类似于图像处理):
// x1 derivative
for i 1 -> n1
for j 1 ->n2
DF_x1[i][j] = F[i-1][j]*A + F[i][j]*B + F[i+1][j]*C
// x2 derivative
for i 1 -> n1
for j 1 ->n2
DF_x2[i][j] = F[i][j-1]*D + F[i][j]*E + F[i][j+1]*G
我正在处理它,我将在几个小时内发布结果代码,同时考虑到您的建议。