2

我有一个大小为 N 的方形布尔矩阵 M,按行存储,我想计算每列设置为 1 的位数。

例如对于 n=4:

1101
0101
0001
1001

M stored as { { 1,1,0,1}, {0,1,0,1}, {0,0,0,1}, {1,0,0,1} };

result = { 2, 2, 0, 4}; 

我显然可以

  1. 将矩阵 M 转置为矩阵 M'
  2. popcount M' 的每一行。

存在通过位操作进行矩阵转置和弹出计数的良好算法。

我的问题是:是否有可能将这些算法“合并”成一个单一的算法?

请注意,对于 64 位架构,N 可能非常大(比如 1024 或更多)。

4

3 回答 3

2

相关:在许多 64 位位掩码上分别计算每个位位置,使用 AVX 但不是 AVX2https://github.com/mklarqvist/positional-popcount


我有另一个想法,我还没有很好地写完。

Godbolt 链接到没有正确循环边界/清理的混乱工作,但对于大型缓冲区,运行速度比我的 Skylake i7-6700k 上的 @edrezen 版本快约 3 倍,使用 g++7.3 -O3 -march=native . 见test_SWAR_avx2功能。(我知道它不能在 Godbolt 上编译;Agner Fog 的 asmlib.h 不存在。)

我也可能有一些列的顺序错误,但是从单步执行 asm 开始,我认为它做的工作量是正确的。即任何必要的错误修正都不会减慢它的速度。

我使用了 16 位累加器,因此如果您关心的输入大到足以溢出每列 16 位计数器,则可能需要另一个外部循环。

有趣的观察:我的循环的早期错误版本在 中使用sum0123了两次store_globalsums_from_vec16sum4567未使用,因此它在主循环中进行了优化。由于工作量较少,gcc 完全展开了大for(int i=0 ; i<5 ; i++)循环,代码运行速度较慢,例如每个字节大约 1 个周期而不是 0.5 个。对于 uop 缓存或其他东西来说,循环可能太大了(我还没有分析,但前端解码瓶颈会解释它)。出于某种原因,@edrezen 的版本对我来说仅以大约 1.5c/B 的速度运行,而不是答案中报告的 ~1.25。我的 CPU 实际上运行的是 3.9GHz,但 Agner Fog 的库在 4.0 时检测到它,但这还不足以解释它。

此外,gcc 溢出sum4567_16bit到堆栈,所以我们已经在没有 AVX512 的情况下推动了寄存器压力的边界。它不经常更新并且不是问题,但是可能在内循环中需要更多的累加器。


当列数不是 32 时,您的数据布局不清楚。

似乎对于uint32_t32 列的每一块,所有行都连续存储在内存中。即循环遍历列的行是有效的。如果您有超过 32 列,则 32..63 列的行将是连续的,并且位于 0..31 列的所有行之后。

(如果相反,单行的所有列都是连续的,您仍然可以使用这个想法,但可能需要将一些累加器溢出/重新加载到内存中,或者如果编译器做出了不错的选择,则让编译器为您执行此操作。)

因此,加载一个 32 字节(8 个 dword)向量会为一列块获取 8 行数据。这非常方便,允许从 1 位(在内存中)扩大到 2 位累加器,然后在我们扩大到 4 位之前获取更多数据,依此类推,一路求和,这样我们就可以在数据的同时完成大量工作还是稠密的。(而不是仅将每个字节添加 1 位(0 或 1)到向量累加器。)

我们展开的越多,我们可以从内存中获取的数据就越多,以更好地利用向量中的编码空间。即我们的变量具有更高的熵。每个或解包/洗牌指令抛出更多数据(就促成它的内存位而言)vpaddb/w/d/q是一件好事。

SIMD 向量中小于 1 字节的累加器基本上是一种 https://en.wikipedia.org/wiki/SWAR技术,您必须在其中 AND 远离您移过元素边界的位,因为我们没有 SIMD 元素为我们做这件事的界限。(而且我们无论如何都要避免溢出,所以 ADD 进入下一个元素不是问题。)

每个内部循环迭代:

  • 从 2 或 3(组)行中的每一行的相同列中获取数据向量。因此,您要么从一块 32 列中获得 3 * 8 行,要么从 256 列中获得 3 行。

  • 用 ( )屏蔽它们set1(0b01010101)以获得偶数(低)位,并用(vec>>1) & mask( _mm256_srli_epi32(v,1)) 获得奇数(高)位。用于_mm256_add_epi8在这些 2 位累加器中累加。它们不能只用 3 个溢出,所以进位传播边界实际上并不重要。

    向量的每个字节都有 4 个独立的垂直和,并且有两个向量(奇数/偶数)。

  • 再次重复上述操作,从内存中的 3 个数据向量中获取另一对向量。

  • 再次组合得到 4 个 4 位累加器向量(可能值为 0..6)。当然,仍然没有在单个 32 位元素中混合位,因为我们绝不能这样做。移位仅将奇数/高列的位移动到包含它们的 2 位或 4 位单元的底部,因此可以将它们与在其他向量中以相同方式移动的位相加。

  • _mm256_unpacklo/hi_epi8和 mask 或 shift+m​​ask 以获得 8 位累加器

  • 将上述内容放在一个最多运行 5 次的循环中,因此 0..12 累加器的值上升到 0..60(即留出 2 位空间用于解压 8 位累加器,使用它们的所有编码空间。)


如果您的答案中有数据布局,那么我们可以在同一个 vector 中添加来自 dword 元素的数据。我们可以这样做,以便在将累加器扩大到 16 位时不会用完寄存器(因为 x86-64 只有 16 个 YMM 寄存器,我们需要一些用于常量。)

  • _mm256_unpacklo/hi_epi16并添加,以交错成对的 8 位计数器,因此同一列的一组计数器已从 dword 扩展为 qword。

重复这个一般想法以减少__m256i累加器分布的寄存器(或变量)的数量。

有效地处理缺少车道交叉的 2 输入字节或字混洗是不方便的,但它只是整个工作的一小部分。 vextracti128/ vpaddb xmm->vpmovzxbw工作得很好。

于 2018-08-04T11:19:27.940 回答
1

我在这两种方法之间做了一些基准测试:

  1. 转置+弹出计数
  2. 逐行更新

我为这两种方法编写了一个简单的版本和一个 AVX2 版本。我为 AVX2“transpose+popcount”方法使用了一些函数(在 stackoverflow 或其他地方找到)。

在我的测试中,我假设输入是nbRowsx32位压缩格式的矩阵(nbRows 本身是 32 的倍数);因此,矩阵存储为 的数组uint32_t

代码如下:

#include <cinttypes>
#include <cstdio>
#include <cstring>
#include <cmath>
#include <cassert>
#include <chrono>
#include <immintrin.h>
#include <asmlib.h>

using namespace std;
using namespace std::chrono;

// see https://stackoverflow.com/questions/24225786/fastest-way-to-unpack-32-bits-to-a-32-byte-simd-vector
static __m256i expand_bits_to_bytes (uint32_t x);

// see https://mischasan.wordpress.com/2011/10/03/the-full-sse2-bit-matrix-transpose-routine/
static void sse_trans(char const *inp, char *out);

static double deviation (double n, double sum2, double sum);

////////////////////////////////////////////////////////////////////////////////
// Naive approach (matrix transposition)
////////////////////////////////////////////////////////////////////////////////
void test_transpose_popcnt_naive (uint64_t nbRows, const uint32_t* bitmap, uint64_t*  globalSums)
{
    assert (nbRows%32==0);

    uint8_t transpo[32][32];  memset (transpo, 0, sizeof(transpo));

    for (uint64_t k=0; k<nbRows; k+=32)
    {
        // We unpack and transpose the input into a 32x32 bytes matrix
        for (size_t row=0; row<32; row++)
        {
            for (size_t col=0; col<32; col++)  {  transpo[col][row] = (bitmap[k+row] >> col) & 1 ;  }
        }

        for (size_t row=0; row<32; row++)
        {
            // We popcount the current row
            u_int8_t sum=0;
            for (size_t col=0; col<32; col++)  {  sum += transpo[row][col];  }

            // We update the corresponding global sum
            globalSums[row] += sum;
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// Naive approach (row by row)
////////////////////////////////////////////////////////////////////////////////
void test_update_row_by_row_naive (uint64_t nbRows, const uint32_t* bitmap, uint64_t*  globalSums)
{
    for (uint64_t row=0; row<nbRows; row++)
    {
        for (size_t col=0; col<32; col++)
        {
            globalSums[col] += (bitmap[row] >> col) & 1;
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// AVX2 (matrix transposition + popcount)
////////////////////////////////////////////////////////////////////////////////
void test_transpose_popcnt_avx2 (uint64_t nbRows, const uint32_t* bitmap, uint64_t*  globalSums)
{
    assert (nbRows%32==0);

    uint32_t transpo[32];

    const uint32_t* loop = bitmap;
    for (uint64_t k=0; k<nbRows; loop+=32, k+=32)
    {
        // We transpose the input as a 32x32 bytes matrix
        sse_trans ((const char*)loop, (char*)transpo);

        // We update the global sums
        for (size_t i=0; i<32; i++)
        {
            globalSums[i] += __builtin_popcount (transpo[i]);
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// AVX2 approach (update totals row by row)
////////////////////////////////////////////////////////////////////////////////

// Note: we use template specialization to unroll some portions of a loop
template<int N>
void UpdateLocalSums (__m256i& localSums, const uint32_t* bitmap, uint64_t& k)
{
    // We update the local sums with the current row
    localSums = _mm256_sub_epi8 (localSums, expand_bits_to_bytes (bitmap[k++]));

    // Go recursively
    UpdateLocalSums<N-1>(localSums, bitmap, k);
}

template<>
void UpdateLocalSums<0> (__m256i& localSums, const uint32_t* bitmap, uint64_t& k)
{
}

// Dillon Davis proposal: use 4 registers holding uint32_t values and update them from local sums with AVX2
#define USE_AVX2_FOR_GRAND_TOTALS 1

void test_update_row_by_row_avx2 (uint64_t nbRows, const uint32_t* bitmap, uint64_t*  globalSums)
{
    union U256i {  __m256i v;   uint8_t a[32];  uint32_t b[8];  };

    // We use 1 register for updating local totals
    __m256i   localSums = _mm256_setzero_si256();

#ifdef USE_AVX2_FOR_GRAND_TOTALS
    // Dillon Davis proposal: use 4 registers holding uint32_t values and update them from local sums with AVX2
    __m256i   globalSumsReg[4];  for (size_t r=0; r<4; r++)  {   globalSumsReg[r] = _mm256_setzero_si256(); }
#endif

    uint64_t steps = nbRows / 255;
    uint64_t k=0;

    const int divisorOf255 = 5;

    // We iterate over all rows
    for (uint64_t i=0; i<steps; i++)
    {
        // we update the local totals (255*32=8160 additions)
        for (int j=0; j<255/divisorOf255; j++)
        {
            // unroll some portion of the 255 loop through template specialization
            UpdateLocalSums<divisorOf255>(localSums, bitmap, k);
        }

#ifdef USE_AVX2_FOR_GRAND_TOTALS
        // Dillon Davis proposal: use 4 registers holding uint32_t values and update them from local sums

        // We take the 128 high bits of the local sums
        __m256i   localSums2 = _mm256_broadcastsi128_si256(_mm256_extracti128_si256(localSums,1));

        globalSumsReg[0] = _mm256_add_epi32 (globalSumsReg[0],
            _mm256_cvtepu8_epi32 (_mm256_castsi256_si128 (_mm256_srli_si256(localSums, 0)))
        );
        globalSumsReg[1] = _mm256_add_epi32 (globalSumsReg[1],
            _mm256_cvtepu8_epi32 (_mm256_castsi256_si128 (_mm256_srli_si256(localSums, 8)))
        );
        globalSumsReg[2] = _mm256_add_epi32 (globalSumsReg[2],
            _mm256_cvtepu8_epi32 (_mm256_castsi256_si128 (_mm256_srli_si256(localSums2, 0)))
        );
        globalSumsReg[3] = _mm256_add_epi32 (globalSumsReg[3],
            _mm256_cvtepu8_epi32 (_mm256_castsi256_si128 (_mm256_srli_si256(localSums2, 8)))
        );
#else
        // we update the global totals
        U256i tmp = { localSums };
        for (size_t k=0; k<32; k++)  {  globalSums[k] += tmp.a[k];  }
#endif
        // we reset the local totals
        localSums = _mm256_setzero_si256();
    }

#ifdef USE_AVX2_FOR_GRAND_TOTALS
    // We update the global totals into the final uint32_t array
    for (size_t r=0; r<4; r++)
    {
        U256i tmp = { globalSumsReg[r] };
        for (size_t k=0; k<8; k++)  {  globalSums[r*8+k] += tmp.b[k];  }
    }
#endif

    // we update the remaining local totals
    for (uint64_t i=steps*255; i<nbRows; i++)
    {
        UpdateLocalSums<1>(localSums, bitmap, k);
    }

    // we update the global totals
    U256i tmp = { localSums };
    for (size_t k=0; k<32; k++)  {  globalSums[k] += tmp.a[k];  }
}

////////////////////////////////////////////////////////////////////////////////
void execute (
    const char* name,
    void (*fct)(uint64_t nbRows, const uint32_t* bitmap, uint64_t*  globalSums),
    size_t nbRuns,
    uint64_t nbRows,
    u_int32_t* bitmap
)
{
    uint64_t  sums[32];

    double timeTotal=0;
    double cycleTotal=0;
    double timeTotal2=0;
    double cycleTotal2=0;
    uint64_t check=0;

    for (size_t n=0; n<nbRuns; n++)
    {
        memset(sums,0,sizeof(sums));

        // We want both time and cpu cycles information
        milliseconds t0 = duration_cast< milliseconds >(system_clock::now().time_since_epoch());
        uint64_t c0 = ReadTSC();

        // We run the test
        (*fct) (nbRows, bitmap, sums);

        uint64_t c1 = ReadTSC();
        milliseconds t1 = duration_cast< milliseconds >(system_clock::now().time_since_epoch());

        timeTotal  += (t1-t0).count();
        cycleTotal += (double)(c1-c0) / nbRows;

        timeTotal2  += (t1-t0).count() * (t1-t0).count();
        cycleTotal2 += ((double)(c1-c0) / nbRows) * ((double)(c1-c0) / nbRows);

        // We compute some dummy checksum
        for (size_t k=0; k<32; k++)  {  check += sums[k];  }
    }

    printf ("%-21s |  %5.0lf (%5.1lf)            |  %5.2lf (%4.2lf)          |  %.3lf           |  0x%lx\n",
        name,
        timeTotal / nbRuns,
        deviation (nbRuns, timeTotal2, timeTotal),
        cycleTotal/nbRuns,
        deviation (nbRuns, cycleTotal2, cycleTotal),
        check,
        nbRows * cycleTotal / timeTotal / 1000000.0
    );
}

////////////////////////////////////////////////////////////////////////////////
int main(int argc, char **argv)
{
    // We set rows number as 2^n where n is the provided argument
    // For simplification, we assume that the rows number is a multiple of 32
    uint64_t nbRows = 1ULL << (argc>1 ? atoi(argv[1]) : 28);
    size_t   nbRuns = argc>2 ? atoi(argv[2]) : 10;

    // We build an bitmap of size nbRows*32
    uint32_t* bitmap = new uint32_t[nbRows];
    if (bitmap==nullptr)
    {
        fprintf(stderr, "unable to allocate the bitmap\n");
        exit(1);
    }

    // We fill the bitmap with random values
    srand(time(nullptr));
    for (uint64_t i=0; i<nbRows; i++)    {  bitmap[i] = rand() & 0xFFFFFFFF;  }

    printf ("\n");
    printf ("nbRows=%ld  nbRuns=%ld\n", nbRows, nbRuns);
    printf ("------------------------------------------------------------------------------------------------------------\n");
    printf ("name                  | time in msec : mean (sd)  | cycles/row : mean (sd) | frequency in GHz | checksum\n");
    printf ("------------------------------------------------------------------------------------------------------------\n");

    // We launch the benchmark
    execute ("naive (transpo)   ", test_transpose_popcnt_naive,  nbRuns, nbRows, bitmap);
    execute ("naive (row by row)", test_update_row_by_row_naive, nbRuns, nbRows, bitmap);
    execute ("AVX2  (transpo)   ", test_transpose_popcnt_avx2,   nbRuns, nbRows, bitmap);
    execute ("AVX2  (row by row)", test_update_row_by_row_avx2,  nbRuns, nbRows, bitmap);

    printf ("\n");

    // Some clean up
    delete[] bitmap;

    return EXIT_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////
__m256i expand_bits_to_bytes(uint32_t x)
{
    __m256i xbcast = _mm256_set1_epi32(x);

    // Each byte gets the source byte containing the corresponding bit
    __m256i shufmask = _mm256_set_epi64x(
        0x0303030303030303, 0x0202020202020202,
        0x0101010101010101, 0x0000000000000000);
    __m256i shuf     = _mm256_shuffle_epi8(xbcast, shufmask);
    __m256i andmask  = _mm256_set1_epi64x(0x8040201008040201);  // every 8 bits -> 8 bytes, pattern repeats.
    __m256i isolated_inverted = _mm256_and_si256(shuf, andmask);

    // Avoid an _mm256_add_epi8 thanks to Peter Cordes's comment
    return _mm256_cmpeq_epi8(isolated_inverted, andmask);
}

////////////////////////////////////////////////////////////////////////////////
void sse_trans(char const *inp, char *out)
{
#define INP(x,y) inp[(x)*4 + (y)/8]
#define OUT(x,y) out[(y)*4 + (x)/8]

    int rr, cc, i, h;
    union { __m256i x; uint8_t b[32]; } tmp;

    for (cc = 0; cc < 32; cc += 8)
    {
        for (i = 0; i < 32; ++i)
            tmp.b[i] = INP(i, cc);

        for (i = 8; i--; tmp.x = _mm256_slli_epi64(tmp.x, 1))
            *(uint32_t*)&OUT(0, cc + i) = _mm256_movemask_epi8(tmp.x);
    }
}

////////////////////////////////////////////////////////////////////////////////
double deviation (double n, double sum2, double sum)  {  return sqrt (sum2/n - (sum/n)*(sum/n)); }

一些备注:

  • 我使用 Agner Fog 的 asmlib 有一个返回 CPU 周期的函数
  • 编译命令是g++ -O3 -march=native ../Test.cpp -o ./Test -laelf64
  • gcc 版本是 7.3.1
  • CPU 是 Intel(R) Core(TM) i7-6700HQ CPU @ 2.60GHz
  • 我计算了一些虚拟校验和来比较不同测试的结果

现在结果:

------------------------------------------------------------------------------------------------------------
name                  | time in msec : mean (sd)  | cycles/row : mean (sd) | frequency in GHz | checksum
------------------------------------------------------------------------------------------------------------
naive (transpo)       |   4548 ( 36.5)            |  43.91 (0.35)          |  2.592           |  0x9affeb5a6
naive (row by row)    |   3033 ( 11.0)            |  29.29 (0.11)          |  2.592           |  0x9affeb5a6
AVX2  (transpo)       |    767 ( 12.8)            |   7.40 (0.12)          |  2.592           |  0x9affeb5a6
AVX2  (row by row)    |    130 (  4.0)            |   1.25 (0.04)          |  2.591           |  0x9affeb5a6

所以看来AVX2中的“逐行”是目前为止最好的。

请注意,当我看到这个结果(每行少于 2 个周期)时,我没有再努力优化 AVX2“transpose+popcount”方法,这应该可以通过并行计算几个 popcount 来实现(我可能稍后测试它)。

于 2018-08-02T11:23:42.140 回答
1

我最终按照 Peter Cordes 提出的高熵 SWAR 方法编写了另一个实现。此实现是递归的并且依赖于 C++ 模板特化。

全局的想法是在没有进位溢出的情况下将 N 位累加器填充到最大值(这是使用递归的地方)。当这些累加器被填满时,我们会更新总计,并重新开始填充新的 N 位累加器,直到处理完所有行。

这是代码(见函数test_SWAR_recursive):

#include <immintrin.h>
#include <cassert>
#include <chrono>
#include <cinttypes>
#include <cmath>
#include <cstdio>
#include <cstring>

using namespace std;
using namespace std::chrono;

// avoid the #include <asmlib.h>
extern "C" u_int64_t ReadTSC();

static double deviation (double n, double sum2, double sum)  {  return sqrt (sum2/n - (sum/n)*(sum/n)); }

////////////////////////////////////////////////////////////////////////////////
// Recursive SWAR approach (with template specialization)
////////////////////////////////////////////////////////////////////////////////

template<int DEPTH>
struct RecursiveSWAR
{
    // Number of accumulators for current depth
    static const int N = 1<<DEPTH;

    // Array of N-bit accumulators
    typedef __m256i Array[N];

    // Magic numbers (0x55555555, 0x33333333, ...) computed recursively
    static const u_int32_t MAGIC_NUMBER =
        RecursiveSWAR<DEPTH-1>::MAGIC_NUMBER
            * (1 + (1<<(1<<(DEPTH-1))))
            / (1 + (1<<(1<<(DEPTH+0))));

    static void fillAccumulators (u_int32_t*& begin, const u_int32_t* end, Array accumulators)
    {
        // We reset the N-bit accumulators
        for (int i=0; i<N; i++)  {  accumulators[i] = _mm256_setzero_si256();  }

        // We check (only for depth big enough) that we have still rows to process
        if (DEPTH>=3)  if (begin>=end)  { return; }

        typename RecursiveSWAR<DEPTH-1>::Array accumulatorsMinusOne;

        // We load a register with the mask
        __m256i mask = _mm256_set1_epi32 (RecursiveSWAR<DEPTH-1>::MAGIC_NUMBER);

        // We fill the N-bit accumulators to their maximum capacity without carry overflow
        for (int i=0; i<N+1; i++)
        {
            // We fill (N-1)-bit accumulators recursively
            RecursiveSWAR<DEPTH-1>::fillAccumulators (begin, end, accumulatorsMinusOne);

            // We update the N-bit accumulators from the (N-1)-bit accumulators
            for (int j=0; j<RecursiveSWAR<DEPTH-1>::N; j++)
            {
                // LOW part
                accumulators[2*j+0] = _mm256_add_epi32 (
                    accumulators[2*j+0],
                    _mm256_and_si256 (
                        accumulatorsMinusOne[j],
                        mask
                    )
                );

                // HIGH part
                accumulators[2*j+1] = _mm256_add_epi32 (
                    accumulators[2*j+1],
                    _mm256_and_si256 (
                        _mm256_srli_epi32 (
                            accumulatorsMinusOne[j],
                            RecursiveSWAR<DEPTH-1>::N
                        ),
                        mask
                    )
                );
            }
        }
    }
};

// Template specialization for DEPTH=0
template<>
struct RecursiveSWAR<0>
{
    static const int N = 1;

    typedef __m256i Array[N];

    static const u_int32_t MAGIC_NUMBER = 0x55555555;

    static void fillAccumulators (u_int32_t*& begin, const u_int32_t* end, Array result)
    {
        // We just load 8 rows in the AVX2 register
        result[0] = _mm256_loadu_si256 ((__m256i*)begin);

        // We update the iterator
        begin += 1*sizeof(__m256i)/sizeof(u_int32_t);
    }
};

template<int DEPTH> struct TypeInfo  { };
template<> struct TypeInfo<3>  {  typedef u_int8_t  Type; };
template<> struct TypeInfo<4>  {  typedef u_int16_t Type; };
template<> struct TypeInfo<5>  {  typedef u_int32_t Type; };

unsigned char reversebits (unsigned char b)
{
    return ((b * 0x80200802ULL) & 0x0884422110ULL) * 0x0101010101ULL >> 32;
}

void test_SWAR_recursive (uint64_t nbRows, const uint32_t* bitmap, uint32_t*  globalSums)
{
    static const int DEPTH = 4;

    RecursiveSWAR<DEPTH>::Array accumulators;

          uint32_t* begin = (uint32_t*) bitmap;
    const uint32_t* end   = bitmap + nbRows;

    // We reset the grand totals
    for (int i=0; i<32; i++)  { globalSums[i] = 0; }

    while (begin < end)
    {
        // We fill the N-bit accumulators to the maximum without overflow
        RecursiveSWAR<DEPTH>::fillAccumulators (begin, end, accumulators);

        // We update grand totals from the filled N-bit accumulators
        for (int i=0; i<RecursiveSWAR<DEPTH>::N; i++)
        {
            int r = reversebits(i) >> (8-DEPTH);
            u_int32_t* sums   = globalSums+r;
            TypeInfo<DEPTH>::Type*  values = (TypeInfo<DEPTH>::Type*) (accumulators+i);

            for (int j=0; j<8*(1<<(5-DEPTH)); j++)
            {
                sums[(j*RecursiveSWAR<DEPTH>::N) % 32] += values[j];
            }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
void execute (
    const char* name,
    void (*fct)(uint64_t nbRows, const uint32_t* bitmap, uint32_t*  globalSums),
    size_t nbRuns,
    uint64_t nbRows,
    u_int32_t* bitmap
)
{
    uint32_t  sums[32];

    double timeTotal=0;
    double cycleTotal=0;
    double timeTotal2=0;
    double cycleTotal2=0;
    uint64_t check=0;

    for (size_t n=0; n<nbRuns; n++)
    {
        memset(sums,0,sizeof(sums));

        // We want both time and cpu cycles information
        milliseconds t0 = duration_cast< milliseconds >(system_clock::now().time_since_epoch());
        uint64_t c0 = ReadTSC();

        // We run the test
        (*fct) (nbRows, bitmap, sums);

        uint64_t c1 = ReadTSC();
        milliseconds t1 = duration_cast< milliseconds >(system_clock::now().time_since_epoch());

        timeTotal  += (t1-t0).count();
        cycleTotal += (double)(c1-c0) / nbRows;

        timeTotal2  += (t1-t0).count() * (t1-t0).count();
        cycleTotal2 += ((double)(c1-c0) / nbRows) * ((double)(c1-c0) / nbRows);

        // We compute some dummy checksum
        for (size_t k=0; k<32; k++)  {  check += (k+1)*sums[k];  }
    }

    printf ("%-21s |  %5.0lf (%5.1lf)            |  %5.2lf (%5.3lf)         |  %.3lf           |  0x%lx\n",
        name,
        timeTotal / nbRuns,
        deviation (nbRuns, timeTotal2, timeTotal),
        cycleTotal/nbRuns,
        deviation (nbRuns, cycleTotal2, cycleTotal),
        nbRows * cycleTotal / timeTotal / 1000000.0,
        check/nbRuns
    );
}


////////////////////////////////////////////////////////////////////////////////
int main(int argc, char **argv)
{
    // We set rows number as 2^n where n is the provided argument
    // For simplification, we assume that the rows number is a multiple of 32
    uint64_t nbRows = 1ULL << (argc>1 ? atoi(argv[1]) : 28);
    size_t   nbRuns = argc>2 ? atoi(argv[2]) : 10;

    // We build an bitmap of size nbRows*32
    uint64_t actualNbRows = nbRows + 100000;
    uint32_t* bitmap = (uint32_t*)_mm_malloc(sizeof(uint32_t)*actualNbRows, 256);
    if (bitmap==nullptr)
    {
        fprintf(stderr, "unable to allocate the bitmap\n");
        exit(1);
    }
    memset (bitmap, 0, sizeof(u_int32_t)*actualNbRows);

    // We fill the bitmap with random values
    //    srand(time(nullptr));
    for (uint64_t i=0; i<nbRows; i++)    {  bitmap[i] = rand() & 0xFFFFFFFF;  }


    printf ("\n");
    printf ("nbRows=%ld  nbRuns=%ld\n", nbRows, nbRuns);
    printf ("------------------------------------------------------------------------------------------------------------\n");
    printf ("name                  | time in msec : mean (sd)  | cycles/row : mean (sd) | frequency in GHz | checksum\n");
    printf ("------------------------------------------------------------------------------------------------------------\n");

    // We launch the benchmark
    execute ("AVX2  (SWAR rec)  ", test_SWAR_recursive,          nbRuns, nbRows, bitmap);

    printf ("\n");

    // Some clean up
    _mm_free (bitmap);

    return EXIT_SUCCESS;
}

在此代码中,累加器的大小为 2 DEPTH 。请注意,此实现在 DEPTH=5 之前有效。对于 DEPTH=4,以下是与 Peter Cordes(命名为高熵 SWAR)的实现相比的性能结果:

图形

该图给出了处理一行(32 个项目)所需的周期数,作为矩阵行数的函数。正如预期的那样,结果非常相似,因为主要思想是相同的。值得注意的是图表的三个部分:

  • log2(n)<=20 的常数值
  • log2(n) 的值在 20 和 22 之间增加
  • log2(n)>=22 的常数值

我猜 CPU 缓存属性可以解释这种行为。

于 2018-08-26T13:45:33.340 回答