1

我正在使用一个爱好程序来自学高性能计算技术。

我的 PC 有一个 Intel Ivy Bridge Core i7 3770 处理器和 32 GB 内存和 Microsoft vs2010 C 编译器的免费版本。

64 位程序需要大约 20 GB 的内存,因为它有五个 4 GB 的查找表(下面的 bytevecM ... bytevecX)。这个搜索程序的内部循环被写成一个单独的 C 文件(因为我以后可能想用汇编版本替换它),如下所示:

#define H_PRIME 1000003
int inner(
   const char* bytevecM,
   const char* bytevecD,
   const char* bytevecC,
   const char* bytevecL,
   const char* bytevecX,
   int startval, int endval,
   int m2, int d2, int c2, int l2, int x2,
   int* ps
)
{
   int* psin = ps;
   int qqm;
   int m3, m4, m5, m6, m7;
   int d3, d4, d5, d6, d7;
   int c3, c4, c5, c6, c7;
   int l3, l4, l5, l6, l7;
   int x3, x4, x5, x6, x7;
   int q3, q4, q5, q6, q7, q8;
   for (q3 = startval; q3 < endval; ++q3)
   {
      if (q3 == 10 || q3 == 13) continue;
      m3 = (m2 ^ q3) * H_PRIME;
      d3 = (d2 ^ q3) * H_PRIME;
      c3 = (c2 ^ q3) * H_PRIME;
      l3 = (l2 ^ q3) * H_PRIME;
      x3 = (x2 ^ q3) * H_PRIME;
      for (q4 = 1; q4 < 128; ++q4)
      {
         if (q4 == 10 || q4 == 13) continue;
         m4 = (m3 ^ q4) * H_PRIME;
         d4 = (d3 ^ q4) * H_PRIME;
         c4 = (c3 ^ q4) * H_PRIME;
         l4 = (l3 ^ q4) * H_PRIME;
         x4 = (x3 ^ q4) * H_PRIME;
         for (q5 = 1; q5 < 128; ++q5)
         {
            if (q5 == 10 || q5 == 13) continue;
            m5 = (m4 ^ q5) * H_PRIME;
            d5 = (d4 ^ q5) * H_PRIME;
            c5 = (c4 ^ q5) * H_PRIME;
            l5 = (l4 ^ q5) * H_PRIME;
            x5 = (x4 ^ q5) * H_PRIME;
            for (q6 = 1; q6 < 128; ++q6)
            {
               if (q6 == 10 || q6 == 13) continue;
               m6 = (m5 ^ q6) * H_PRIME;
               d6 = (d5 ^ q6) * H_PRIME;
               c6 = (c5 ^ q6) * H_PRIME;
               l6 = (l5 ^ q6) * H_PRIME;
               x6 = (x5 ^ q6) * H_PRIME;
               for (q7 = 1; q7 < 128; ++q7)
               {
                  if (q7 == 10 || q7 == 13) continue;
                  m7 = (m6 ^ q7) * H_PRIME;
                  d7 = (d6 ^ q7) * H_PRIME;
                  c7 = (c6 ^ q7) * H_PRIME;
                  l7 = (l6 ^ q7) * H_PRIME;
                  x7 = (x6 ^ q7) * H_PRIME;
                  for (q8 = 1; q8 < 128; ++q8)
                  {
                     if (q8 == 10 || q8 == 13) continue;
                     qqm = bytevecM[(unsigned int)(m7 ^ q8)];
                     if (qqm != 0
                     &&  qqm == bytevecD[(unsigned int)(d7 ^ q8)]
                     &&  qqm == bytevecC[(unsigned int)(c7 ^ q8)]
                     &&  qqm == bytevecL[(unsigned int)(l7 ^ q8)]
                     &&  qqm == bytevecX[(unsigned int)(x7 ^ q8)])
                     {
                        *ps++ = q3; *ps++ = q4; *ps++ = q5;
                        *ps++ = q6; *ps++ = q7; *ps++ = q8;
                        *ps++ = qqm;
                     }
                  }
               }
            }
         }
      }
   }
   return (int)(ps - psin);
}

顺便说一下,通过在每个线程中以不同的开始和结束范围运行一个副本,上述算法很容易并行化。

使用直觉、英特尔内在函数并单独对每个更改进行基准测试,我能够将运行时间从大约 5 小时减少到 3 小时,如下所示:

#include <emmintrin.h>
#include <smmintrin.h>
#define H_PRIME 1000003
#define UNROLL(q8) qqm = bytevecM[(unsigned int)(m7 ^ q8)];    \
  if (qqm != 0                                                 \
  &&  qqm == bytevecD[(unsigned int)(s7.m128i_i32[0] ^ q8)]    \
  &&  qqm == bytevecC[(unsigned int)(s7.m128i_i32[1] ^ q8)]    \
  &&  qqm == bytevecL[(unsigned int)(s7.m128i_i32[2] ^ q8)]    \
  &&  qqm == bytevecX[(unsigned int)(s7.m128i_i32[3] ^ q8)]) { \
    ps[j++] = _mm_set_epi16(0, qqm, q8, q7, q6, q5, q4, q3); }
int inner(
   const char* bytevecM,
   const char* bytevecD,
   const char* bytevecC,
   const char* bytevecL,
   const char* bytevecX,
   int startval, int endval,
   int m2, int d2, int c2, int l2, int x2,
   __m128i* ps
)
{
   __m128i s2 = _mm_set_epi32(x2, l2, c2, d2);
   __m128i hp = _mm_set1_epi32(H_PRIME);
   __m128i xt[128];
   __m128i s3, s4, s5, s6, s7;
   int qqm;
   int m3, m4, m5, m6, m7;
   int q3, q4, q5, q6, q7;
   int j = 0;
   int z; for (z = 1; z < 128; ++z) { xt[z] = _mm_set1_epi32(z); }
   for (q3 = startval; q3 < endval; ++q3)
   {
      if (q3 == 10 || q3 == 13) continue;
      m3 = (m2 ^ q3) * H_PRIME;
      s3 = _mm_mullo_epi32(_mm_xor_si128(s2, xt[q3]), hp);
      for (q4 = 1; q4 < 128; ++q4)
      {
         if (q4 == 10 || q4 == 13) continue;
         m4 = (m3 ^ q4) * H_PRIME;
         s4 = _mm_mullo_epi32(_mm_xor_si128(s3, xt[q4]), hp);
         for (q5 = 1; q5 < 128; ++q5)
         {
            if (q5 == 10 || q5 == 13) continue;
            m5 = (m4 ^ q5) * H_PRIME;
            s5 = _mm_mullo_epi32(_mm_xor_si128(s4, xt[q5]), hp);
            for (q6 = 1; q6 < 128; ++q6)
            {
               if (q6 == 10 || q6 == 13) continue;
               m6 = (m5 ^ q6) * H_PRIME;
               s6 = _mm_mullo_epi32(_mm_xor_si128(s5, xt[q6]), hp);
               for (q7 = 1; q7 < 128; ++q7)
               {
                  if (q7 == 10 || q7 == 13) continue;
                  m7 = (m6 ^ q7) * H_PRIME;
                  s7 = _mm_mullo_epi32(_mm_xor_si128(s6, xt[q7]), hp);
                  UNROLL(1)
                  UNROLL(96)
                  UNROLL(2)
                  UNROLL(3)
                  UNROLL(4)
                  UNROLL(5)
                  UNROLL(6)
                  UNROLL(7)
                  UNROLL(8)
                  UNROLL(9)
                  UNROLL(11)
                  UNROLL(12)
                  UNROLL(14)
                  // ... snipped UNROLL(15) .. UNROLL(125)
                  UNROLL(126)
                  UNROLL(127)
               }
            }
         }
      }
   }
   return j;
}

这种加速大部分来自内部循环的手动展开。

由于我对英特尔 SSE/AVX 指令非常陌生,如果您刚刚看到上面的内容让您脸红,请告诉我。

Intel VTune 报告最大的热点出现在线路上:

UNROLL(1)

在对应的汇编代码中,热点如下图所示:

mov     eax, ecx                         0.917s
mov     edx, ecx                         0.062s
xor     rax, 0x1
movdqa  xmmword ptr [rsp+0x20], xmm0
mov     ebx, dword ptr [rsp+0x2c]        0.155s
mov     r11d, dword ptr [rsp+0x28]       0.949s
movsx   ecx, byte ptr [rax+rdi*1]        0.156s
mov     r9d, dword ptr [rsp+0x24]       91.132s
mov     r8d, dword ptr [rsp+0x20]        0.233s
test    ecx, ecx
jz      0x14000225b
---
mov     eax, r8d                         0.342s
xor     rax, 0x1                         0.047s
movsx   eax, byte ptr [rax+r13*1]        0.124s
cmp     ecx, eax                        12.631s
jnz     0x14000225b
---
mov     eax, r9d
xor     rax, 0x1
movsx   eax, byte ptr [rax+r12*1]
cmp     ecx, eax                         0.016s
jnz     0x14000225b

这对我来说似乎是一个“数据局部性”问题。每次通过内部循环时,m7 的值在 4 GB 范围内变化很大且不可预测,因此在查找 qqm=bytevecM[m7^1] 时,您可能会遇到第一个 UNROLL(1) 的缓存未命中。

由于随后的 UNROLL(2)..UNROLL(127) xor m7 与 2..127,您通常会在其余的 UNROLL 中获得缓存命中。奇怪的是,通过将 UNROLL(96) 移到 UNROLL(1) 之后来更改 UNROLL 的顺序,可以显着加快速度。

我知道从内存中读取一个字节会导致填充包含该字节的(64 字节)缓存行。

由于我对这个领域非常陌生,我欢迎任何关于如何加快内存查找的建议或良好参考,尤其是在处理大型表时(在我的例子中,4 GB 表)。

我看不到使用上述算法改善数据局部性的明显方法;也欢迎就如何实现这一目标提出建议。

2013 年 3 月 29 日更新

自从编写了这个节点以来,我已经能够将运行时间从 3 小时进一步减少到 20 分钟,如下所示。

为每个 4 GB bytevec 添加一个 4 MB 位图将其减少到大约 40 分钟,通过添加一些 _mm_prefetch 调用进一步减半。

请注意,基本算法保持不变:通过添加位图改善了数据局部性;通过添加 _mm_prefetch 调用减少了延迟。

欢迎提出进一步性能改进的建议。改进后的程序如下:

#include <emmintrin.h>
#include <smmintrin.h>
#define H_PRIME 1000003

#define UNROLL(qx) qqm = bytevecM[m7 ^ qx];         \
  if (qqm != 0                                      \
  &&  qqm == bytevecD[d7 ^ qx]) {                   \
    _mm_prefetch(&bytevecC[c7 ^ qx], _MM_HINT_T0);  \
    qd[nqd++] = qqm; qd[nqd++] = qx; }

int inner(
   const unsigned char* bitvecM,
   const unsigned char* bitvecD,
   const unsigned char* bitvecC,
   const unsigned char* bitvecL,
   const unsigned char* bitvecX,
   const unsigned char* bitvecV,
   const unsigned char* bitvecI,
   const unsigned char* bytevecM,
   const unsigned char* bytevecD,
   const unsigned char* bytevecC,
   const unsigned char* bytevecL,
   const unsigned char* bytevecX,
   int startval, int endval,
   int m2, int d2, int c2, int l2, int x2, int v2, int i2,
   __m128i* ps
)
{
   __declspec(align(16)) __m128i s2 = _mm_set_epi32(i2, v2, x2, l2);
   __declspec(align(16)) __m128i hp = _mm_set1_epi32(H_PRIME);
   __declspec(align(16)) __m128i xt[128];
   __declspec(align(16)) __m128i s3, s4, s5, s6;
   int m3, m4, m5, m6;
   int d3, d4, d5, d6;
   int c3, c4, c5, c6;
   unsigned int m7, d7, c7, l7, x7, v7, i7;
   int qqm;
   int q3, q4, q5, q6, q7, q8;
   int iret = 0;
   unsigned int qf[128*4];
   int nqf;
   int qz;
   int qd[128*2];
   int nqd;
   int cnt;
   int qh[128*3];
   int nqh;
   int qi[128*5];
   int nqi;
   unsigned int m7arr[128];
   unsigned int d7arr[128];
   const size_t* pbvM = (size_t*)bitvecM;
   const size_t* pbvD = (size_t*)bitvecD;
   const size_t* pbvC = (size_t*)bitvecC;
   const size_t* pbvL = (size_t*)bitvecL;
   const size_t* pbvX = (size_t*)bitvecX;
   const size_t* pbvV = (size_t*)bitvecV;
   const size_t* pbvI = (size_t*)bitvecI;
   int z; for (z = 1; z < 128; ++z) { xt[z] = _mm_set1_epi32(z); }

   for (q3 = startval; q3 < endval; ++q3)
   {
      if (q3 == 10 || q3 == 13) continue;
      m3 = (m2 ^ q3) * H_PRIME;
      d3 = (d2 ^ q3) * H_PRIME;
      c3 = (c2 ^ q3) * H_PRIME;
      s3 = _mm_mullo_epi32(_mm_xor_si128(s2, xt[q3]), hp);
      for (q4 = 1; q4 < 128; ++q4)
      {
         if (q4 == 10 || q4 == 13) continue;
         m4 = (m3 ^ q4) * H_PRIME;
         d4 = (d3 ^ q4) * H_PRIME;
         c4 = (c3 ^ q4) * H_PRIME;
         s4 = _mm_mullo_epi32(_mm_xor_si128(s3, xt[q4]), hp);
         for (q5 = 1; q5 < 128; ++q5)
         {
            if (q5 == 10 || q5 == 13) continue;
            m5 = (m4 ^ q5) * H_PRIME;
            d5 = (d4 ^ q5) * H_PRIME;
            c5 = (c4 ^ q5) * H_PRIME;
            s5 = _mm_mullo_epi32(_mm_xor_si128(s4, xt[q5]), hp);
            for (q6 = 1; q6 < 128; ++q6)
            {
               if (q6 == 10 || q6 == 13) continue;
               m6 = (m5 ^ q6) * H_PRIME;
               d6 = (d5 ^ q6) * H_PRIME;
               c6 = (c5 ^ q6) * H_PRIME;
               s6 = _mm_mullo_epi32(_mm_xor_si128(s5, xt[q6]), hp);
               for (q7 = 1; q7 < 128; ++q7)
               {
                  if (q7 == 10 || q7 == 13) continue;
                  m7arr[q7] = (unsigned int)( (m6 ^ q7) * H_PRIME );
                  _mm_prefetch((const char*)(&pbvM[m7arr[q7] >> 13]), _MM_HINT_T0);
                  d7arr[q7] = (unsigned int)( (d6 ^ q7) * H_PRIME );
                  _mm_prefetch((const char*)(&pbvD[d7arr[q7] >> 13]), _MM_HINT_T0);
               }
               nqh = 0;
               for (q7 = 1; q7 < 128; ++q7)
               {
                  if (q7 == 10 || q7 == 13) continue;
                  if ( (pbvM[m7arr[q7] >> 13] & ((size_t)1 << ((m7arr[q7] >> 7) & 63))) == 0 ) continue;
                  if ( (pbvD[d7arr[q7] >> 13] & ((size_t)1 << ((d7arr[q7] >> 7) & 63))) == 0 ) continue;
                  c7 = (unsigned int)( (c6 ^ q7) * H_PRIME );
                  _mm_prefetch((const char*)(&pbvC[c7 >> 13]), _MM_HINT_T0);
                  l7 = (unsigned int)( (s6.m128i_i32[0] ^ q7) * H_PRIME );
                  _mm_prefetch((const char*)(&pbvL[l7 >> 13]), _MM_HINT_T0);
                  qh[nqh++] = q7;
                  qh[nqh++] = c7;
                  qh[nqh++] = l7;
               }
               nqi = 0;
               cnt = 0;
               while (cnt < nqh)
               {
                  q7 = qh[cnt++];
                  c7 = qh[cnt++];
                  l7 = qh[cnt++];
                  if ( (pbvC[c7 >> 13] & ((size_t)1 << ((c7 >> 7) & 63))) == 0 ) continue;
                  if ( (pbvL[l7 >> 13] & ((size_t)1 << ((l7 >> 7) & 63))) == 0 ) continue;
                  x7 = (unsigned int)( (s6.m128i_i32[1] ^ q7) * H_PRIME );
                  _mm_prefetch((const char*)(&pbvX[x7 >> 13]), _MM_HINT_T0);
                  v7 = (unsigned int)( (s6.m128i_i32[2] ^ q7) * H_PRIME );
                  _mm_prefetch((const char*)(&pbvV[v7 >> 13]), _MM_HINT_T0);
                  qi[nqi++] = q7;
                  qi[nqi++] = c7;
                  qi[nqi++] = l7;
                  qi[nqi++] = x7;
                  qi[nqi++] = v7;
               }
               nqf = 0;
               cnt = 0;
               while (cnt < nqi)
               {
                  q7 = qi[cnt++];
                  c7 = qi[cnt++];
                  l7 = qi[cnt++];
                  x7 = qi[cnt++];
                  v7 = qi[cnt++];
                  if ( (pbvX[x7 >> 13] & ((size_t)1 << ((x7 >> 7) & 63))) == 0 ) continue;
                  if ( (pbvV[v7 >> 13] & ((size_t)1 << ((v7 >> 7) & 63))) == 0 ) continue;
                  i7 = (unsigned int)( (s6.m128i_i32[3] ^ q7) * H_PRIME );
                  if ( (pbvI[i7 >> 13] & ((size_t)1 << ((i7 >> 7) & 63))) == 0 ) continue;
                  _mm_prefetch(&bytevecD[d7arr[q7] & 0xffffff80], _MM_HINT_T0);
                  _mm_prefetch(&bytevecD[64+(d7arr[q7] & 0xffffff80)], _MM_HINT_T0);
                  qf[nqf++] = q7;
                  qf[nqf++] = c7;
                  qf[nqf++] = l7;
                  qf[nqf++] = x7;
               }
               cnt = 0;
               while (cnt < nqf)
               {
                  q7 = qf[cnt];
                  cnt += 4;
                  _mm_prefetch(&bytevecM[m7arr[q7] & 0xffffff80], _MM_HINT_T0);
                  _mm_prefetch(&bytevecM[64+(m7arr[q7] & 0xffffff80)], _MM_HINT_T0);
               }
               qz = 0;
               while (qz < nqf)
               {
                  q7 = qf[qz++];
                  c7 = qf[qz++];
                  l7 = qf[qz++];
                  x7 = qf[qz++];
                  m7 = m7arr[q7];
                  d7 = d7arr[q7];
                  nqd = 0;
                  UNROLL(1)
                  UNROLL(96)
                  UNROLL(2)
                  UNROLL(3)
                  UNROLL(4)
                  UNROLL(5)
                  UNROLL(6)
                  UNROLL(7)
                  UNROLL(8)
                  UNROLL(9)
                  UNROLL(11)
                  UNROLL(12)
                  UNROLL(14)
                  // ... snipped UNROLL(15) .. UNROLL(125)
                  UNROLL(126)
                  UNROLL(127)
                  if (nqd == 0) continue;
                  cnt = 0;
                  while (cnt < nqd)
                  {
                     qqm = qd[cnt++];
                     q8  = qd[cnt++];
                     if (qqm == bytevecC[c7 ^ q8]
                     &&  qqm == bytevecL[l7 ^ q8]
                     &&  qqm == bytevecX[x7 ^ q8])
                     {
                        ps[iret++] = _mm_set_epi16(0, qqm, q8, q7, q6, q5, q4, q3);
                     }
                  }
               }
            }
         }
      }
   }
   return iret;
}
4

2 回答 2

1

如果您确实需要以随机方式访问内存,那么您只能做一些装饰来节省主内存停顿的时间。硬件根本无法随机跳转到内存地址并在单个周期内获得可用数据。当然,总是有“技巧”,通常取决于确切的芯片组。不过,没有人想到这个问题。

对于非常高性能的系统,您通常需要仔细查看算法或技巧来避免工作。在您的另一个问题的上下文中考虑这个问题使用 C/Intel 程序集,测试 128 字节内存块是否包含全零的最快方法是什么?并且使用大多数时间内存为零的事实,您可以简单地设置每 128 字节甚至 512 字节 1 位,并将其用作测试完整块的短路。这可以看作是一个http://en.wikipedia.org/wiki/Bloom_filter,尽管我更喜欢将其视为每个条目 1 位的数组。

对于 4Gb 查找表,您将需要 31.5Mb(每 128 字节 1 位)或 7.8Mb(每 4x 128 字节 1 位等)。这里的目的是尝试让更小的存在指示符更可能是在缓存中。当然,查找表编写器会产生额外的内存写入。

另一种技术可能根本不适合,具体取决于数据如何写入查找数组,将值的地址存储在排序数组中,而不是数组本身。这将允许您使用 320Mb 作为地址值(如果我的数学是正确的)并允许您使用二进制搜索,但可以为初始探测带来良好的缓存效率。

或者,不是立即执行最终的内部循环,而是保存所需的输入变量,然后继续进行下一个封闭循环迭代。完成后,按内存地址对这个内部循环命中列表进行排序,然后在它们自己的循环中执行它们。这种技术有很多陷阱,只有在您可能重用相同的内存块时才真正有帮助,但这种方法在现实世界中使用

于 2013-03-02T19:23:40.740 回答
1

几个想法:

我会尝试对这件作品进行变体:

#define UNROLL(q8) qqm = bytevecM[(unsigned int)(m7 ^ q8)];    \
  if (qqm != 0                                                 \
  &&  qqm == bytevecD[(unsigned int)(s7.m128i_i32[0] ^ q8)]    \
  &&  qqm == bytevecC[(unsigned int)(s7.m128i_i32[1] ^ q8)]    \
  &&  qqm == bytevecL[(unsigned int)(s7.m128i_i32[2] ^ q8)]    \
  &&  qqm == bytevecX[(unsigned int)(s7.m128i_i32[3] ^ q8)]) { \
    ps[j++] = _mm_set_epi16(0, qqm, q8, q7, q6, q5, q4, q3); }

如果您进行以下更改,它可能会表现更好:

  1. 使用PXOR而不是四个单独的XOR指令
  2. 拉出宏,即在最里面的循环中做_mm_set_epi16() 第一
    __m128i pp = _mm_set_epi16(0, 0, 0, q7, q6, q5, q4, q3);
    件事。
  3. 插入迭代计数器q8qqm需要的位置/时间

随着这些变化,它会变成这样:

#define UNROLL(q8)                                               \
qqm = bytevecM[(unsigned int)(m7 ^ q8)];                         \
if (qqm) {                                                       \
    pp = _mm_insert_epi16(pp, q8, 2);                            \
    __m128i qq = _mm_xor_si128(_mm_set1_epi32(q8), s7));         \
    if (qqm == bytevecD[qq.m128i_i32[0]] &&                      \
        qqm == bytevecC[qq.m128i_i32[1]] &&                      \
        qqm == bytevecL[qq.m128i_i32[2]] &&                      \
        qqm == bytevecX[qq.m128i_i32[3]]))                       \
        ps[j++] = _mm_insert_epi16(pp, qqm, 1);                  \
}

此外,您从提早获得加速的事实UNROLL(96)意味着您在那时为 s 的所有 128 字节部分预填充了第二个缓存bytevec线。这也应该可以通过添加:

_mm_prefetch(&bytevecM[m7 | 0x60], _MM_HINT_T0);
_mm_prefetch(&bytevecD[s7.m128i_i32[0] | 0x60], _MM_HINT_T0);
_mm_prefetch(&bytevecC[s7.m128i_i32[1] | 0x60], _MM_HINT_T0);
_mm_prefetch(&bytevecL[s7.m128i_i32[2] | 0x60], _MM_HINT_T0);
_mm_prefetch(&bytevecX[s7.m128i_i32[3] | 0x60], _MM_HINT_T0);

前第一UNROLL(1)

于 2013-03-12T18:45:40.910 回答