7

我有一个稀疏数组a(大部分为零):

unsigned char a[1000000]; 

我想创建一个索引数组,以在带有 AVX2 的英特尔 x64 架构上使用 SIMD 指令b的非零元素。a我正在寻找如何有效地做到这一点的提示。具体来说,是否有 SIMD 指令来获取 SIMD 寄存器中连续非零元素的位置,并连续排列?

4

3 回答 3

6

计算非零索引的五种方法是:

  • 半向量化循环:加载带有字符的 SIMD 向量,与零进行比较并应用移动掩码。如果任何字符不为零(@stgatilov也建议),请使用一个小的标量循环。这适用于非常稀疏的数组。下面代码中的函数arr2ind_movmsk使用 BMI1 指令进行标量循环。

  • 矢量化循环:英特尔 Haswell 处理器和更新的处理器支持 BMI1 和 BMI2 指令集。BMI2 包含pext指令(并行位提取,请参阅维基百科链接),结果证明它在这里很有用。请参阅arr2ind_pext下面的代码。

  • 带有 if 语句的经典标量循环:arr2ind_if

  • 没有分支的标量循环:arr2ind_cmov.

  • 查找表:@stgatilov表明可以使用查找表代替 pdep 和其他整数指令。这可能工作得很好,但是,查找表非常大:它不适合 L1 缓存。这里没有测试。另请参阅此处的讨论。


/* 
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c

example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros:   
              ./a.out 20000 25
*/

#include <stdio.h>
#include <immintrin.h>
#include <stdint.h>
#include <omp.h> 
#include <string.h>


__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0, k;
   __m256i msk;
   m0=0;
   for (i=0;i<n;i=i+32){                              /* Load 32 bytes and compare with zero:           */
      msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256());
      k=_mm256_movemask_epi8(msk);
      k=~k;                                           /* Search for nonzero bits instead of zero bits.  */
      while (k){
         ind[m0]=i+_tzcnt_u32(k);                     /* Count the number of trailing zero bits in k.   */
         m0++;
         k=_blsr_u32(k);                              /* Clear the lowest set bit in k.                 */
      }
   }
   *m=m0;
   return 0;
}


__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   uint64_t     cntr_const = 0xFEDCBA9876543210;
   __m256i      shft       = _mm256_set_epi64x(0x04,0x00,0x04,0x00);
   __m256i      vmsk       = _mm256_set1_epi8(0x0F);
   __m256i      cnst16     = _mm256_set1_epi32(16);
   __m256i      shf_lo     = _mm256_set_epi8(0x80,0x80,0x80,0x0B,   0x80,0x80,0x80,0x03,   0x80,0x80,0x80,0x0A,   0x80,0x80,0x80,0x02,
                                             0x80,0x80,0x80,0x09,   0x80,0x80,0x80,0x01,   0x80,0x80,0x80,0x08,   0x80,0x80,0x80,0x00);
   __m256i      shf_hi     = _mm256_set_epi8(0x80,0x80,0x80,0x0F,   0x80,0x80,0x80,0x07,   0x80,0x80,0x80,0x0E,   0x80,0x80,0x80,0x06,
                                             0x80,0x80,0x80,0x0D,   0x80,0x80,0x80,0x05,   0x80,0x80,0x80,0x0C,   0x80,0x80,0x80,0x04);
   __m128i      pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80,  0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);                                            

   __m256i      i_vec      = _mm256_setzero_si256();
   m0=0;
   for (i=0;i<n;i=i+16){
      __m128i   v          = _mm_load_si128((__m128i *)&a[i]);                     /* Load 16 bytes.                                                                               */
      __m128i   msk        = _mm_cmpeq_epi8(v,_mm_setzero_si128());                /* Generate 16x8 bit mask.                                                                      */
                msk        = _mm_srli_epi64(msk,4);                                /* Pack 16x8 bit mask to 16x4 bit mask.                                                         */
                msk        = _mm_shuffle_epi8(msk,pshufbcnst);                     /* Pack 16x8 bit mask to 16x4 bit mask.                                                         */
                msk        = _mm_xor_si128(msk,_mm_set1_epi32(-1));                /* Invert 16x4 mask.                                                                            */
      uint64_t  msk64      = _mm_cvtsi128_si64x(msk);                              /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/
      int       p          = _mm_popcnt_u64(msk64)>>2;                             /* p is the number of nonzeros in 16 bytes of a.                                                */
      uint64_t  cntr       = _pext_u64(cntr_const,msk64);                          /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */
                                                                                   /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers.                    */  
      __m256i   cntr256    = _mm256_set1_epi64x(cntr);
                cntr256    = _mm256_srlv_epi64(cntr256,shft);
                cntr256    = _mm256_and_si256(cntr256,vmsk);
      __m256i   cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo);
      __m256i   cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi);
                cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo);
                cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi);

                             _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo);     /* Note that the stores of iteration i and i+16 may overlap.                                                         */
                             _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi);   /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ 
                m0         = m0+p;
                i_vec      = _mm256_add_epi32(i_vec,cnst16);
   }
   *m=m0;
   return 0;
}


__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   m0=0;
   for (i=0;i<n;i++){
      if (a[i]!=0){
         ind[m0]=i;
         m0=m0+1;
      }
   }
   *m=m0;
   return 0;
}


__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   m0=0;
   for (i=0;i<n;i++){
      ind[m0]=i;
      m0=(a[i]==0)? m0 : m0+1;   /* Compiles to cmov instruction. */
   }
   *m=m0;
   return 0;
}


__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){
   int i;
   for (i=0;i<m;i++) printf("i=%d,  ind[i]=%d   a[ind[i]]=%u\n",i,ind[i],a[ind[i]]);
   printf("\n");   fflush( stdout );
   return 0;
}


__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){
   int i;                              /* Compute a hash to compare the results of different methods. */
   unsigned int chk=0;
   for (i=0;i<m;i++){
      chk=((chk<<1)|(chk>>31))^(ind[i]);
   }
   printf("chk = %10X\n",chk);
   return 0;
}



int main(int argc, char **argv){
int n, i, m; 
unsigned int j, k, d;
unsigned char *a;
int *ind;
double t0,t1;
int meth, nrep;
char txt[30];

sscanf(argv[1],"%d",&n);            /* Length of array a.                                    */
n=n>>5;                             /* Adjust n to a multiple of 32.                         */
n=n<<5;
sscanf(argv[2],"%u",&d);            /* The approximate fraction of nonzeros in a is: d/1024  */
printf("n=%d,   d=%u\n",n,d);

a=_mm_malloc(n*sizeof(char),32);
ind=_mm_malloc(n*sizeof(int),32);    

                                    /* Generate a pseudo random array a.                     */
j=73659343;                   
for (i=0;i<n;i++){
   j=j*653+1;
   k=(j & 0x3FF00)>>8;              /* k is a pseudo random number between 0 and 1023        */
   if (k<d){
      a[i] = (j&0xFE)+1;            /* Set a[i] to nonzero.                                  */
   }else{
      a[i] = 0;
   }

}

/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d,   a[i]=%u\n",i,a[i]);}} printf("\n");   */ /* Uncomment this line to print the nonzeros in a. */ 

char txt0[]="arr2ind_movmsk: ";
char txt1[]="arr2ind_pext:   ";
char txt2[]="arr2ind_if:     ";
char txt3[]="arr2ind_cmov:   ";

nrep=10000;                                   /* Repeat a function nrep times to make relatively accurate timings possible.                          */
                                              /* With nrep=1000000:   ./a.out 10016 4 ;   ./a.out 10016 48 ;   ./a.out 10016 519                    */
                                              /* With nrep=10000:     ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513                  */
printf("nrep = \%d  \n\n",nrep);
arr2ind_movmsk(a,n,ind,&m);                   /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */
for (meth=0;meth<4;meth++){
   t0=omp_get_wtime();
   switch (meth){
      case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m);         strcpy(txt,txt0); break;
      case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m);           strcpy(txt,txt1); break;
      case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m);             strcpy(txt,txt2); break;
      case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m);           strcpy(txt,txt3); break;
      default: ;
   }
   t1=omp_get_wtime();
   printf("method = %s  ",txt);
   /* print_chk(a,ind,m);  */
   printf(" elapsed time = %6.2f\n",t1-t0);
}
print_nonz(a, ind, 2);                                            /* Do something with the results                 */
printf("density = %f %% \n\n",((double)m)/((double)n)*100);       /* Actual nonzero density of array a.            */

/* print_nonz(a, ind, m);    */  /* Uncomment this line to print the indices of the nonzeros.                      */

return 0;
}

/*
With nrep=1000000:
 ./a.out 10016 4 ;    ./a.out 10016 4 ;    ./a.out 10016 48 ;    ./a.out 10016 48 ;    ./a.out 10016 519  ;    ./a.out 10016 519       
With nrep=10000:  
 ./a.out 1000000 5 ;  ./a.out 1000000 5 ;  ./a.out 1000000 52 ;  ./a.out 1000000 52 ;  ./a.out 1000000 513 ;   ./a.out 1000000 513     
*/


该代码使用数组大小​​ n=10016(数据适合 L1 缓存)和 n=1000000 进行了测试,不同的非零密度约为 0.5%、5% 和 50%。为了准确计时,这些函数分别被调用了 1000000 和 10000 次。


Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500
                     0.53%        5.1%       50.0%
arr2ind_movmsk:       0.27        0.53        4.89
arr2ind_pext:         1.44        1.59        1.45
arr2ind_if:           5.93        8.95       33.82
arr2ind_cmov:         6.82        6.83        6.82

Time in seconds, size n=1000000, 1e4 function calls.

                     0.49%        5.1%       50.1%
arr2ind_movmsk:       0.57        2.03        5.37
arr2ind_pext:         1.47        1.47        1.46
arr2ind_if:           5.88        8.98       38.59
arr2ind_cmov:         6.82        6.81        6.81


在这些示例中,矢量化循环比标量循环快。的性能arr2ind_movmsk很大程度上取决于 的密度a。它仅比arr2ind_pext密度足够小时更快。盈亏平衡点还取决于数组大小n。函数“arr2ind_if”显然在 50% 非零密度下分支预测失败。

于 2017-01-31T13:11:54.473 回答
1

如果您希望非零元素的数量非常少(即远小于 1%),那么您可以简单地检查每个 16 字节块是否为非零:

    int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128());
    if (mask != 65535) {
        //store zero bits of mask with scalar code
    }

如果好元素的百分比足够小,则错误预测分支的成本和“if”内的慢标量代码的成本可以忽略不计。


至于一个好的通用解决方案,首先考虑流压缩的 SSE 实现。它从字节数组中删除所有零元素(取自此处的想法):

__m128i shuf [65536]; //must be precomputed
char    cnt  [65536]; //must be precomputed

int compress(const char *src, int len, char *dst) {
    char *ptr = dst;
    for (int i = 0; i < len; i += 16) {
        __m128i reg = _mm_load_si128((__m128i*)&src[i]);
        __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
        int mask = _mm_movemask_epi8(zeroMask);
        __m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]);
        _mm_storeu_si128((__m128i*)ptr, compressed);
        ptr += cnt[mask];   //alternative:   ptr += 16-_mm_popcnt_u32(mask);
    }
    return ptr - dst;
}

如您所见, ( _mm_shuffle_epi8+ 查找表) 可以创造奇迹。我不知道任何其他对结构复杂的代码(如流压缩)进行矢量化的方法。


现在,您的请求唯一剩下的问题是您想要获取索引。每个索引必须以 4 字节值存储,因此 16 个输入字节的块可能会产生多达 64 个字节的输出,这不适合单个 SSE 寄存器。

处理此问题的一种方法是将输出诚实地解压缩为 64 字节。所以你reg在代码中用常量(0,1,2,3,4,...,15)替换,然后将SSE寄存器解压成4个寄存器,并添加一个有四个i值的寄存器。这需要更多的指令:6 条解包指令、4 条添加指令和 3 条存储指令(一个已经存在)。对我来说,这是一个巨大的开销,特别是如果你预计不到 25% 的非零元素。

或者,您可以将单循环迭代处理的非零字节数限制为 4,以便一个寄存器始终足以用于输出。这是示例代码:

__m128i shufMask [65536]; //must be precomputed
char    srcMove  [65536]; //must be precomputed
char    dstMove  [65536]; //must be precomputed

int compress_ids(const char *src, int len, int *dst) {
    const char *ptrSrc = src;
    int *ptrDst = dst;
    __m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15);
    __m128i base = _mm_setzero_si128();
    while (ptrSrc < src + len) {
        __m128i reg = _mm_loadu_si128((__m128i*)ptrSrc);
        __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
        int mask = _mm_movemask_epi8(zeroMask);
        __m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]);
        __m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128());
        ids32 = _mm_add_epi32(ids32, base);
        _mm_storeu_si128((__m128i*)ptrDst, ids32);
        ptrDst += dstMove[mask];    //alternative:   ptrDst += min(16-_mm_popcnt_u32(mask), 4);
        ptrSrc += srcMove[mask];    //no alternative without LUT
        base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask]));
    }
    return ptrDst - dst;
}

这种方法的一个缺点是,现在每个后续循环迭代都无法开始,直到ptrDst += dstMove[mask];在前一次迭代中执行该行。所以关键路径急剧增加。硬件超线程或其手动仿真可以消除这种损失。


因此,如您所见,这个基本思想有很多变体,所有这些都以不同程度的效率解决了您的问题。如果您不喜欢它,也可以减小 LUT 的大小(同样,以降低吞吐量性能为代价)。

这种方法不能完全扩展到更宽的寄存器(即 AVX2 和 AVX-512),但您可以尝试将多个连续迭代的指令组合成单个 AVX2 或 AVX-512 指令,从而略微提高吞吐量。

注意:我没有测试任何代码(因为正确预计算 LUT 需要付出很大的努力)。

于 2017-01-11T15:22:54.913 回答
0

虽然 AVX2 指令集有很多 GATHER 指令,但是它的性能太慢了。最有效的方法是手动处理数组。

于 2013-10-02T14:30:17.907 回答