我正在使用一个爱好程序来自学高性能计算技术。
我的 PC 有一个 Intel Ivy Bridge Core i7 3770 处理器和 32 GB 内存和 Microsoft vs2010 C 编译器的免费版本。
64 位程序需要大约 20 GB 的内存,因为它有五个 4 GB 的查找表(下面的 bytevecM ... bytevecX)。这个搜索程序的内部循环被写成一个单独的 C 文件(因为我以后可能想用汇编版本替换它),如下所示:
#define H_PRIME 1000003
int inner(
const char* bytevecM,
const char* bytevecD,
const char* bytevecC,
const char* bytevecL,
const char* bytevecX,
int startval, int endval,
int m2, int d2, int c2, int l2, int x2,
int* ps
)
{
int* psin = ps;
int qqm;
int m3, m4, m5, m6, m7;
int d3, d4, d5, d6, d7;
int c3, c4, c5, c6, c7;
int l3, l4, l5, l6, l7;
int x3, x4, x5, x6, x7;
int q3, q4, q5, q6, q7, q8;
for (q3 = startval; q3 < endval; ++q3)
{
if (q3 == 10 || q3 == 13) continue;
m3 = (m2 ^ q3) * H_PRIME;
d3 = (d2 ^ q3) * H_PRIME;
c3 = (c2 ^ q3) * H_PRIME;
l3 = (l2 ^ q3) * H_PRIME;
x3 = (x2 ^ q3) * H_PRIME;
for (q4 = 1; q4 < 128; ++q4)
{
if (q4 == 10 || q4 == 13) continue;
m4 = (m3 ^ q4) * H_PRIME;
d4 = (d3 ^ q4) * H_PRIME;
c4 = (c3 ^ q4) * H_PRIME;
l4 = (l3 ^ q4) * H_PRIME;
x4 = (x3 ^ q4) * H_PRIME;
for (q5 = 1; q5 < 128; ++q5)
{
if (q5 == 10 || q5 == 13) continue;
m5 = (m4 ^ q5) * H_PRIME;
d5 = (d4 ^ q5) * H_PRIME;
c5 = (c4 ^ q5) * H_PRIME;
l5 = (l4 ^ q5) * H_PRIME;
x5 = (x4 ^ q5) * H_PRIME;
for (q6 = 1; q6 < 128; ++q6)
{
if (q6 == 10 || q6 == 13) continue;
m6 = (m5 ^ q6) * H_PRIME;
d6 = (d5 ^ q6) * H_PRIME;
c6 = (c5 ^ q6) * H_PRIME;
l6 = (l5 ^ q6) * H_PRIME;
x6 = (x5 ^ q6) * H_PRIME;
for (q7 = 1; q7 < 128; ++q7)
{
if (q7 == 10 || q7 == 13) continue;
m7 = (m6 ^ q7) * H_PRIME;
d7 = (d6 ^ q7) * H_PRIME;
c7 = (c6 ^ q7) * H_PRIME;
l7 = (l6 ^ q7) * H_PRIME;
x7 = (x6 ^ q7) * H_PRIME;
for (q8 = 1; q8 < 128; ++q8)
{
if (q8 == 10 || q8 == 13) continue;
qqm = bytevecM[(unsigned int)(m7 ^ q8)];
if (qqm != 0
&& qqm == bytevecD[(unsigned int)(d7 ^ q8)]
&& qqm == bytevecC[(unsigned int)(c7 ^ q8)]
&& qqm == bytevecL[(unsigned int)(l7 ^ q8)]
&& qqm == bytevecX[(unsigned int)(x7 ^ q8)])
{
*ps++ = q3; *ps++ = q4; *ps++ = q5;
*ps++ = q6; *ps++ = q7; *ps++ = q8;
*ps++ = qqm;
}
}
}
}
}
}
}
return (int)(ps - psin);
}
顺便说一下,通过在每个线程中以不同的开始和结束范围运行一个副本,上述算法很容易并行化。
使用直觉、英特尔内在函数并单独对每个更改进行基准测试,我能够将运行时间从大约 5 小时减少到 3 小时,如下所示:
#include <emmintrin.h>
#include <smmintrin.h>
#define H_PRIME 1000003
#define UNROLL(q8) qqm = bytevecM[(unsigned int)(m7 ^ q8)]; \
if (qqm != 0 \
&& qqm == bytevecD[(unsigned int)(s7.m128i_i32[0] ^ q8)] \
&& qqm == bytevecC[(unsigned int)(s7.m128i_i32[1] ^ q8)] \
&& qqm == bytevecL[(unsigned int)(s7.m128i_i32[2] ^ q8)] \
&& qqm == bytevecX[(unsigned int)(s7.m128i_i32[3] ^ q8)]) { \
ps[j++] = _mm_set_epi16(0, qqm, q8, q7, q6, q5, q4, q3); }
int inner(
const char* bytevecM,
const char* bytevecD,
const char* bytevecC,
const char* bytevecL,
const char* bytevecX,
int startval, int endval,
int m2, int d2, int c2, int l2, int x2,
__m128i* ps
)
{
__m128i s2 = _mm_set_epi32(x2, l2, c2, d2);
__m128i hp = _mm_set1_epi32(H_PRIME);
__m128i xt[128];
__m128i s3, s4, s5, s6, s7;
int qqm;
int m3, m4, m5, m6, m7;
int q3, q4, q5, q6, q7;
int j = 0;
int z; for (z = 1; z < 128; ++z) { xt[z] = _mm_set1_epi32(z); }
for (q3 = startval; q3 < endval; ++q3)
{
if (q3 == 10 || q3 == 13) continue;
m3 = (m2 ^ q3) * H_PRIME;
s3 = _mm_mullo_epi32(_mm_xor_si128(s2, xt[q3]), hp);
for (q4 = 1; q4 < 128; ++q4)
{
if (q4 == 10 || q4 == 13) continue;
m4 = (m3 ^ q4) * H_PRIME;
s4 = _mm_mullo_epi32(_mm_xor_si128(s3, xt[q4]), hp);
for (q5 = 1; q5 < 128; ++q5)
{
if (q5 == 10 || q5 == 13) continue;
m5 = (m4 ^ q5) * H_PRIME;
s5 = _mm_mullo_epi32(_mm_xor_si128(s4, xt[q5]), hp);
for (q6 = 1; q6 < 128; ++q6)
{
if (q6 == 10 || q6 == 13) continue;
m6 = (m5 ^ q6) * H_PRIME;
s6 = _mm_mullo_epi32(_mm_xor_si128(s5, xt[q6]), hp);
for (q7 = 1; q7 < 128; ++q7)
{
if (q7 == 10 || q7 == 13) continue;
m7 = (m6 ^ q7) * H_PRIME;
s7 = _mm_mullo_epi32(_mm_xor_si128(s6, xt[q7]), hp);
UNROLL(1)
UNROLL(96)
UNROLL(2)
UNROLL(3)
UNROLL(4)
UNROLL(5)
UNROLL(6)
UNROLL(7)
UNROLL(8)
UNROLL(9)
UNROLL(11)
UNROLL(12)
UNROLL(14)
// ... snipped UNROLL(15) .. UNROLL(125)
UNROLL(126)
UNROLL(127)
}
}
}
}
}
return j;
}
这种加速大部分来自内部循环的手动展开。
由于我对英特尔 SSE/AVX 指令非常陌生,如果您刚刚看到上面的内容让您脸红,请告诉我。
Intel VTune 报告最大的热点出现在线路上:
UNROLL(1)
在对应的汇编代码中,热点如下图所示:
mov eax, ecx 0.917s
mov edx, ecx 0.062s
xor rax, 0x1
movdqa xmmword ptr [rsp+0x20], xmm0
mov ebx, dword ptr [rsp+0x2c] 0.155s
mov r11d, dword ptr [rsp+0x28] 0.949s
movsx ecx, byte ptr [rax+rdi*1] 0.156s
mov r9d, dword ptr [rsp+0x24] 91.132s
mov r8d, dword ptr [rsp+0x20] 0.233s
test ecx, ecx
jz 0x14000225b
---
mov eax, r8d 0.342s
xor rax, 0x1 0.047s
movsx eax, byte ptr [rax+r13*1] 0.124s
cmp ecx, eax 12.631s
jnz 0x14000225b
---
mov eax, r9d
xor rax, 0x1
movsx eax, byte ptr [rax+r12*1]
cmp ecx, eax 0.016s
jnz 0x14000225b
这对我来说似乎是一个“数据局部性”问题。每次通过内部循环时,m7 的值在 4 GB 范围内变化很大且不可预测,因此在查找 qqm=bytevecM[m7^1] 时,您可能会遇到第一个 UNROLL(1) 的缓存未命中。
由于随后的 UNROLL(2)..UNROLL(127) xor m7 与 2..127,您通常会在其余的 UNROLL 中获得缓存命中。奇怪的是,通过将 UNROLL(96) 移到 UNROLL(1) 之后来更改 UNROLL 的顺序,可以显着加快速度。
我知道从内存中读取一个字节会导致填充包含该字节的(64 字节)缓存行。
由于我对这个领域非常陌生,我欢迎任何关于如何加快内存查找的建议或良好参考,尤其是在处理大型表时(在我的例子中,4 GB 表)。
我看不到使用上述算法改善数据局部性的明显方法;也欢迎就如何实现这一目标提出建议。
2013 年 3 月 29 日更新
自从编写了这个节点以来,我已经能够将运行时间从 3 小时进一步减少到 20 分钟,如下所示。
为每个 4 GB bytevec 添加一个 4 MB 位图将其减少到大约 40 分钟,通过添加一些 _mm_prefetch 调用进一步减半。
请注意,基本算法保持不变:通过添加位图改善了数据局部性;通过添加 _mm_prefetch 调用减少了延迟。
欢迎提出进一步性能改进的建议。改进后的程序如下:
#include <emmintrin.h>
#include <smmintrin.h>
#define H_PRIME 1000003
#define UNROLL(qx) qqm = bytevecM[m7 ^ qx]; \
if (qqm != 0 \
&& qqm == bytevecD[d7 ^ qx]) { \
_mm_prefetch(&bytevecC[c7 ^ qx], _MM_HINT_T0); \
qd[nqd++] = qqm; qd[nqd++] = qx; }
int inner(
const unsigned char* bitvecM,
const unsigned char* bitvecD,
const unsigned char* bitvecC,
const unsigned char* bitvecL,
const unsigned char* bitvecX,
const unsigned char* bitvecV,
const unsigned char* bitvecI,
const unsigned char* bytevecM,
const unsigned char* bytevecD,
const unsigned char* bytevecC,
const unsigned char* bytevecL,
const unsigned char* bytevecX,
int startval, int endval,
int m2, int d2, int c2, int l2, int x2, int v2, int i2,
__m128i* ps
)
{
__declspec(align(16)) __m128i s2 = _mm_set_epi32(i2, v2, x2, l2);
__declspec(align(16)) __m128i hp = _mm_set1_epi32(H_PRIME);
__declspec(align(16)) __m128i xt[128];
__declspec(align(16)) __m128i s3, s4, s5, s6;
int m3, m4, m5, m6;
int d3, d4, d5, d6;
int c3, c4, c5, c6;
unsigned int m7, d7, c7, l7, x7, v7, i7;
int qqm;
int q3, q4, q5, q6, q7, q8;
int iret = 0;
unsigned int qf[128*4];
int nqf;
int qz;
int qd[128*2];
int nqd;
int cnt;
int qh[128*3];
int nqh;
int qi[128*5];
int nqi;
unsigned int m7arr[128];
unsigned int d7arr[128];
const size_t* pbvM = (size_t*)bitvecM;
const size_t* pbvD = (size_t*)bitvecD;
const size_t* pbvC = (size_t*)bitvecC;
const size_t* pbvL = (size_t*)bitvecL;
const size_t* pbvX = (size_t*)bitvecX;
const size_t* pbvV = (size_t*)bitvecV;
const size_t* pbvI = (size_t*)bitvecI;
int z; for (z = 1; z < 128; ++z) { xt[z] = _mm_set1_epi32(z); }
for (q3 = startval; q3 < endval; ++q3)
{
if (q3 == 10 || q3 == 13) continue;
m3 = (m2 ^ q3) * H_PRIME;
d3 = (d2 ^ q3) * H_PRIME;
c3 = (c2 ^ q3) * H_PRIME;
s3 = _mm_mullo_epi32(_mm_xor_si128(s2, xt[q3]), hp);
for (q4 = 1; q4 < 128; ++q4)
{
if (q4 == 10 || q4 == 13) continue;
m4 = (m3 ^ q4) * H_PRIME;
d4 = (d3 ^ q4) * H_PRIME;
c4 = (c3 ^ q4) * H_PRIME;
s4 = _mm_mullo_epi32(_mm_xor_si128(s3, xt[q4]), hp);
for (q5 = 1; q5 < 128; ++q5)
{
if (q5 == 10 || q5 == 13) continue;
m5 = (m4 ^ q5) * H_PRIME;
d5 = (d4 ^ q5) * H_PRIME;
c5 = (c4 ^ q5) * H_PRIME;
s5 = _mm_mullo_epi32(_mm_xor_si128(s4, xt[q5]), hp);
for (q6 = 1; q6 < 128; ++q6)
{
if (q6 == 10 || q6 == 13) continue;
m6 = (m5 ^ q6) * H_PRIME;
d6 = (d5 ^ q6) * H_PRIME;
c6 = (c5 ^ q6) * H_PRIME;
s6 = _mm_mullo_epi32(_mm_xor_si128(s5, xt[q6]), hp);
for (q7 = 1; q7 < 128; ++q7)
{
if (q7 == 10 || q7 == 13) continue;
m7arr[q7] = (unsigned int)( (m6 ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvM[m7arr[q7] >> 13]), _MM_HINT_T0);
d7arr[q7] = (unsigned int)( (d6 ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvD[d7arr[q7] >> 13]), _MM_HINT_T0);
}
nqh = 0;
for (q7 = 1; q7 < 128; ++q7)
{
if (q7 == 10 || q7 == 13) continue;
if ( (pbvM[m7arr[q7] >> 13] & ((size_t)1 << ((m7arr[q7] >> 7) & 63))) == 0 ) continue;
if ( (pbvD[d7arr[q7] >> 13] & ((size_t)1 << ((d7arr[q7] >> 7) & 63))) == 0 ) continue;
c7 = (unsigned int)( (c6 ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvC[c7 >> 13]), _MM_HINT_T0);
l7 = (unsigned int)( (s6.m128i_i32[0] ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvL[l7 >> 13]), _MM_HINT_T0);
qh[nqh++] = q7;
qh[nqh++] = c7;
qh[nqh++] = l7;
}
nqi = 0;
cnt = 0;
while (cnt < nqh)
{
q7 = qh[cnt++];
c7 = qh[cnt++];
l7 = qh[cnt++];
if ( (pbvC[c7 >> 13] & ((size_t)1 << ((c7 >> 7) & 63))) == 0 ) continue;
if ( (pbvL[l7 >> 13] & ((size_t)1 << ((l7 >> 7) & 63))) == 0 ) continue;
x7 = (unsigned int)( (s6.m128i_i32[1] ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvX[x7 >> 13]), _MM_HINT_T0);
v7 = (unsigned int)( (s6.m128i_i32[2] ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvV[v7 >> 13]), _MM_HINT_T0);
qi[nqi++] = q7;
qi[nqi++] = c7;
qi[nqi++] = l7;
qi[nqi++] = x7;
qi[nqi++] = v7;
}
nqf = 0;
cnt = 0;
while (cnt < nqi)
{
q7 = qi[cnt++];
c7 = qi[cnt++];
l7 = qi[cnt++];
x7 = qi[cnt++];
v7 = qi[cnt++];
if ( (pbvX[x7 >> 13] & ((size_t)1 << ((x7 >> 7) & 63))) == 0 ) continue;
if ( (pbvV[v7 >> 13] & ((size_t)1 << ((v7 >> 7) & 63))) == 0 ) continue;
i7 = (unsigned int)( (s6.m128i_i32[3] ^ q7) * H_PRIME );
if ( (pbvI[i7 >> 13] & ((size_t)1 << ((i7 >> 7) & 63))) == 0 ) continue;
_mm_prefetch(&bytevecD[d7arr[q7] & 0xffffff80], _MM_HINT_T0);
_mm_prefetch(&bytevecD[64+(d7arr[q7] & 0xffffff80)], _MM_HINT_T0);
qf[nqf++] = q7;
qf[nqf++] = c7;
qf[nqf++] = l7;
qf[nqf++] = x7;
}
cnt = 0;
while (cnt < nqf)
{
q7 = qf[cnt];
cnt += 4;
_mm_prefetch(&bytevecM[m7arr[q7] & 0xffffff80], _MM_HINT_T0);
_mm_prefetch(&bytevecM[64+(m7arr[q7] & 0xffffff80)], _MM_HINT_T0);
}
qz = 0;
while (qz < nqf)
{
q7 = qf[qz++];
c7 = qf[qz++];
l7 = qf[qz++];
x7 = qf[qz++];
m7 = m7arr[q7];
d7 = d7arr[q7];
nqd = 0;
UNROLL(1)
UNROLL(96)
UNROLL(2)
UNROLL(3)
UNROLL(4)
UNROLL(5)
UNROLL(6)
UNROLL(7)
UNROLL(8)
UNROLL(9)
UNROLL(11)
UNROLL(12)
UNROLL(14)
// ... snipped UNROLL(15) .. UNROLL(125)
UNROLL(126)
UNROLL(127)
if (nqd == 0) continue;
cnt = 0;
while (cnt < nqd)
{
qqm = qd[cnt++];
q8 = qd[cnt++];
if (qqm == bytevecC[c7 ^ q8]
&& qqm == bytevecL[l7 ^ q8]
&& qqm == bytevecX[x7 ^ q8])
{
ps[iret++] = _mm_set_epi16(0, qqm, q8, q7, q6, q5, q4, q3);
}
}
}
}
}
}
}
return iret;
}