5

在我的代码中,我必须处理 websocket 数据包的“取消屏蔽”,这本质上意味着对任意长度的未对齐数据进行异或。多亏了 SO(Websocket data unmasking / multi byte xor),我已经找到了如何(希望)使用 SSE2/AVX2 扩展来加快速度,但是现在看,在我看来,我对未对齐数据的处理完全是次要的-最佳。有什么方法可以优化我的代码,或者至少在相同性能的情况下使其更简单,还是我的代码已经是性能最好的?

这是代码的重要部分(对于这个问题,我假设数据总是至少足以运行一次 AVX2 循环,但同时它最多只能运行几次):

// circular shift left for uint32
int cshiftl_u32(uint32_t num, uint8_t shift) {
   return (num << shift) | (num >> (32 - shift));                                                                       
}                                                                                                                     

// circular shift right for uint32
int cshiftr_u32(uint32_t num, uint8_t shift) {
   return (num >> shift) | (num << (32 - shift));                                                                       
}                                                                                                                     

void optimized_xor_32( uint32_t mask, uint8_t *ds, uint8_t *de ) {
   if (ds == de) return; // zero data len -> nothing to do

   uint8_t maskOffset = 0;

// process single bytes till 4 byte alignment ( <= 3 )
   for (; ds < de && ( (uint64_t)ds & (uint64_t)3 ); ds++) {
      *ds ^= *((uint8_t *)(&mask) + maskOffset);
      maskOffset = (maskOffset + 1) & (uint8_t)3;
   }

   if (ds == de) return; // done, return

   if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions
      mask = cshiftl_u32(mask, maskOffset);

      maskOffset = 0;
   }

// process 4 byte block till 8 byte alignment ( <= 1 )
   uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31));

   if ( ds < de32 && ( (uint64_t)de & (uint64_t)7 ) ) {
      *(uint32_t *)ds ^= mask; // mask is uint32_t

      if (++ds == de) return;
   }

// process 8 byte block till 16 byte alignment ( <= 1 )
   uint64_t mask64 = mask | (mask << 4);
   uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63));

   if ( ds < de64 && ( (uint64_t)ds & (uint64_t)15 ) ) {
      *(uint64_t *)ds ^= mask64;

      if (++ds == de) return; // done, return
   }


// process 16 byte block till 32 byte alignment ( <= 1) (if supported)
#ifdef CPU_SSE2 
   __m128i v128, v128_mask;
   v128_mask = _mm_set1_epi32(mask);

   uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127));

   if ( ds < de128 && ( (uint64_t)ds & (uint64_t)31 ) ) {
      v128 = _mm_load_si128((__m128i *)ds);
      v128 = _mm_xor_si128(v128, v128_mask);
      _mm_store_si128((__m128i *)ds, v128);

      if (++ds == de) return; // done, return
   }

#endif
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards)
   __m256i v256, v256_mask;
   v256_mask = _mm256_set1_epi32(mask);

   uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255));

   for (; ds < de256; ds+=32) {
      v256 = _mm256_load_si256((__m256i *)ds);
      v256 = _mm256_xor_si256(v256, v256_mask);
      _mm256_store_si256((__m256i *)ds, v256);
   }

   if (ds == de) return; // done, return
#endif
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported)
   for (; ds < de128; ds+=16) {
      v128 = _mm_load_si128((__m128i *)ds);
      v128 = _mm_xor_si128(v128, v128_mask);
      _mm_store_si128((__m128i *)ds, v128);
   }

   if (ds == de) return; // done, return

#endif
   // process remaining 8 byte blocks 
   // this should always be supported, so remaining can be assumed to be executed <= 1 times
   for (; ds < de64; ds += 8) {
      *(uint64_t *)ds ^= mask64;
   }

   if (ds == de) return; // done, return

   // process remaining 4 byte blocks ( <= 1)
   if (ds < de32) {
      *(uint32_t *)ds ^= mask;

      if (++ds == de) return; // done, return
   }


   // process remaining bytes ( <= 3)

   for (; ds < de; ds ++) {
      *ds ^= *((uint8_t *)(&mask) + maskOffset);
      maskOffset = (maskOffset + 1) & (uint8_t)3;
   }

}

PS:请忽略使用#ifdef 代替cpuid 等进行cpu 标志检测。

4

1 回答 1

2

与手册中所说的不同,大多数英特尔处理器实际上非常擅长处理未对齐的数据。由于您使用英特尔的编译器内置进行矢量处理,我假设您可以访问相当新的icc.

如果您不能自然地对齐您的数据,那么恐怕您正在做的事情已经尽可能接近最高性能。为了使代码在 Xeon Phi(64 字节向量寄存器)/未来更长的向量处理器上更具可读性和可部署性,我建议您开始使用Intel Cilk Plus

例子:

void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) {
    while (length & 0x3) {
        *(d++) ^= mask;
        asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
        length--;
    }

    // switch to 4 bytes per block
    uint32_t _d = d;
    length >>= 2;

    // Intel Cilk Plus Array Notation
    // Should expand automatically to the best possible SIMD instructions
    // you are compiling for
    _d[0:length] ^= mask;
}

请注意,我没有测试此代码,因为我现在无法访问英特尔编译器。如果您遇到问题,那么我下周回到办公室时可以解决。

如果您更喜欢内在函数,那么正确使用预处理器宏可以大大简化您的生活:

#if defined(__MIC__)
// intel Xeon Phi
#define VECTOR_BLOCKSIZE 64
// I do not remember the correct types/instructions right now
#error "TODO: MIC handling"
#elif defined(CPU_AVX2)
#define VECTOR_BLOCKSIZE 32
typedef __m256i my_vector_t;
#define VECTOR_LOAD_MASK _mm256_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask))
#elif defined(CPU_SSE2) 
#define VECTOR_BLOCKSIZE 16
typedef __m128i my_vector_t;
#define VECTOR_LOAD_MASK _mm128_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask))
#else
#define VECTOR_BLOCKSIZE 8
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask))
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask)
typedef uint64_t my_vector_t;
#fi

void optimized_xor_32( uint32_t mask, uint8_t *d, size_t length ) {
    size_t i;

    // there really is no point in having extra
    // branches for different vector lengths if they are
    // executed at most once
    // branch prediction is your friend here
    // so we do one byte at a time until the block size
    // is reached

    while (length && (d & (VECTOR_BLOCKSIZE - 1))) {
        *(d++) ^= mask;
        asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
        length--;
    }

    my_vector_t * d_vector = (my_vector_t *)d;
    my_vector_t vector_mask = VECTOR_LOAD_MASK(mask);

    size_t vector_legth = length / VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift
    length &= VECTOR_BLOCKSIZE -1; // remaining length

    for (i = 0; i < vector_legth; i++) {
      VECTOR_XOR(d_vector + i, vector_mask);
    }

    // process the tail
    d = (uint8_t*)(d_vector + i);
    for (i = 0; i < length; i++) {
      d[i] ^= mask;
      asm ("rold $8, %0" : "+g" (mask) :: "cc");
    }

}

另一方面:您可能希望使用 x86 旋转指令而不是位移来旋转mask

#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc")
于 2013-09-14T11:38:01.653 回答