3

以下代码对两个std::vectors v1和进行操作v2,每个都包含多个 128 元素向量。通过外部向量的循环(使用i1i2)包含一个内部循环,旨在限制执行进一步复杂处理的组合i1i2。过滤掉了大约 99.9% 的组合。

不幸的是,过滤循环是我的程序中的一个主要瓶颈——分析表明,整个运行时间的 26% 都花在了在线上if(a[k] + b[k] > LIMIT)

const vector<vector<uint16_t>> & v1 = ...
const vector<vector<uint16_t>> & v2 = ...

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000
    for(size_t i2 = 0; i2 < v2.size(); ++i2) {

        const vector<uint16_t> & a = v1[i1];
        const vector<uint16_t> & b = v2[i2];

        bool good = true;
        for(std::size_t k = 0; k < 128; ++k) {
            if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000
                good = false;
                break;
            }
        }
        if(!good) continue;

        // Further processing involving i1 and i2
    }
}

我认为可以通过增加内存局部性以及向量化来提高这段代码的性能。关于如何做到这一点或可以做出的其他改进的任何建议?

4

4 回答 4

3

您可以将 SIMD 应用于内部循环:

    bool good = true;
    for(std::size_t k = 0; k < 128; ++k) {
        if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000
            good = false;
            break;
        }

如下:

#include <emmintrin.h>  // SSE2 intrinsics
#include <limits.h>     // SHRT_MIN

// ...

    // some useful constants - declare these somewhere before the outermost loop

    const __m128i vLIMIT = _mm_set1_epi16(LIMIT + SHRT_MIN); // signed version of LIMIT
    const __m128i vOFFSET = _mm_set1_epi16(SHRT_MIN);        // offset for uint16_t -> int16_t conversion

// ...

    bool good = true;
    for(std::size_t k = 0; k < 128; k += 8) {
        __m128i v, va, vb;              // iterate through a, b, 8 elements at a time
        int mask;
        va = _mm_loadu_si128(&a[k]);    // get 8 elements from a[k], b[k]
        vb = _mm_loadu_si128(&b[k]);
        v = _mm_add_epi16(va, vb);      // add a and b vectors
        v = _mm_add_epi16(v, vOFFSET);  // subtract 32768 to make signed
        v = _mm_cmpgt_epi16(v, vLIMIT); // compare against LIMIT
        mask = _mm_maskmove_epi8(v);    // get comparison results as 16 bit mask
        if (mask != 0) {                // if any value exceeded limit
            good = false;               // clear good flag and exit loop
            break;
        }

警告:未经测试的代码 - 可能需要调试,但一般方法应该是合理的。

于 2013-10-08T13:53:29.450 回答
1

您已经获得了最有效的访问模式 for v1,但是您正在按顺序扫描所有v2外部循环的每次迭代。这是非常低效的,因为v2访问会不断导致(L2 可能还有 L3)缓存未命中。

更好的访问模式是增加循环嵌套,以便外部循环跨越v1and v2,内部循环处理两者的子段中的元素,v1并且v2足够小以适合缓存。

基本上,而不是

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000
    for(size_t i2 = 0; i2 < v2.size(); ++i2) {

for(size_t i2a = 0; i2a < v2.size(); i2a += 32) {
    for(size_t i1 = 0; i1 < v1.size(); ++i1) {
        for(size_t i2 = i2a; i2 < v2.size() && i2 < i2a + 32; ++i2) {

或者

size_t i2a = 0;

// handle complete blocks
for(; i2a < v2.size() - 31; i2a += 32) {
    for(size_t i1 = 0; i1 < v1.size(); ++i1) {
        for(size_t i2 = i2a; i2 < i2a + 32; ++i2) {

        }
    }
}

// handle leftover partial block
for(size_t i1 = 0; i1 < v1.size(); ++i1) {
    for(size_t i2 = i2a; i2 < v2.size(); ++i2) {
    }
}

这样,一大块32 * 128 * sizeof (uint16_t)字节或 8kB 将从v2缓存中加载,然后重复使用 20,000 次。

这种改进与 SIMD (SSE) 矢量化正交。它将与基于线程的并行性交互,但可能以一种很好的方式。

于 2013-10-11T16:49:17.560 回答
0

首先,一个简单的优化可以是这样,但编译器可以自己完成,所以我不确定它可以改进多少:

for(std::size_t k = 0; k < 128 && good; ++k)
{
    good = a[k] + b[k] <= LIMIT;
}

其次,我认为最好将好的结果保留在第二个向量中,因为涉及 i1 和 i2 的任何处理都可能破坏 CPU 缓存。

第三,这可能是主要的优化,我认为您可以将第二个 for 循环重写为: for(size_t i2 = i1; i2 < v2.size(); ++i2) 因为您对 a 和 b 使用 + 操作可交换向量,因此 i1 和 i2 的结果将与 i2 和 i1 相同。为此,您需要使 v1 和 v2 大小相同。如果大小不同,则需要以不同的方式编写迭代。

Fort,据我所知,您正在处理两个矩阵,最好保留元素向量而不是向量向量。

希望这可以帮助。拉兹万。

于 2013-10-08T10:03:34.630 回答
0

几点建议:

  • 正如评论中所建议的,将内部 128 元素向量替换为数组以获得更好的内存局部性。
  • 这段代码看起来高度可并行化,你试过吗?您可以拆分组合以在所有可用内核上进行过滤,然后重新平衡收集的工作并将处理拆分到所有内核上。

我使用数组实现了一个版本,用于内部 128 个元素,PPL 用于并行化(需要 VS 2012 或更高版本)和一些 SSE 代码用于过滤,并获得了相当显着的加速。根据“进一步处理”所涉及的具体内容,稍微不同地构建事物可能会带来好处(例如,在此示例中,我不会在过滤后重新平衡工作)。

更新:我实现了 Ben Voigt 建议的缓存阻塞,并获得了更多的加速。

#include <vector>
#include <array>
#include <random>
#include <limits>
#include <cstdint>
#include <iostream>
#include <numeric>
#include <chrono>
#include <iterator>

#include <ppl.h>

#include <immintrin.h>

using namespace std;
using namespace concurrency;

namespace {
const int outerVecSize = 20000;
const int innerVecSize = 128;
const int LIMIT = 16000;
auto engine = default_random_engine();
};

typedef vector<uint16_t> InnerVec;
typedef array<uint16_t, innerVecSize> InnerArr;

template <typename Cont> void randomFill(Cont& c) {
    // We want approx 0.1% to pass filter, the mean and standard deviation are chosen to get close to that
    static auto dist = normal_distribution<>(LIMIT / 4.0, LIMIT / 4.6);
    generate(begin(c), end(c), [] {
        auto clamp = [](double x, double minimum, double maximum) { return min(max(minimum, x), maximum); };
        return static_cast<uint16_t>(clamp(dist(engine), 0.0, numeric_limits<uint16_t>::max()));
    });
}

void resizeInner(InnerVec& v) { v.resize(innerVecSize); }
void resizeInner(InnerArr& a) {}

template <typename Inner> Inner generateRandomInner() {
    auto inner = Inner();
    resizeInner(inner);
    randomFill(inner);
    return inner;
}

template <typename Inner> vector<Inner> generateRandomInput() {
    auto outer = vector<Inner>(outerVecSize);
    generate(begin(outer), end(outer), generateRandomInner<Inner>);
    return outer;
}

void Report(const chrono::high_resolution_clock::duration elapsed, size_t in1Size, size_t in2Size,
            const int passedFilter, const uint32_t specialValue) {
    cout << passedFilter << "/" << in1Size* in2Size << " ("
         << 100.0 * (double(passedFilter) / double(in1Size * in2Size)) << "%) passed filter\n";
    cout << specialValue << "\n";
    cout << "Elapsed time = " << chrono::duration_cast<chrono::milliseconds>(elapsed).count() << "ms" << endl;
}

void TestOriginalVersion() {
    cout << __FUNCTION__ << endl;

    engine.seed();
    const auto v1 = generateRandomInput<InnerVec>();
    const auto v2 = generateRandomInput<InnerVec>();

    int passedFilter = 0;
    uint32_t specialValue = 0;

    auto startTime = chrono::high_resolution_clock::now();

    for (size_t i1 = 0; i1 < v1.size(); ++i1) { // v1.size() and v2.size() about 20000
        for (size_t i2 = 0; i2 < v2.size(); ++i2) {

            const vector<uint16_t>& a = v1[i1];
            const vector<uint16_t>& b = v2[i2];

            bool good = true;
            for (std::size_t k = 0; k < 128; ++k) {
                if (static_cast<int>(a[k]) + static_cast<int>(b[k])
                    > LIMIT) { // LIMIT is a const uint16_t: approx 16000
                    good = false;
                    break;
                }
            }
            if (!good) continue;

            // Further processing involving i1 and i2
            ++passedFilter;
            specialValue += inner_product(begin(a), end(a), begin(b), 0);
        }
    }

    auto endTime = chrono::high_resolution_clock::now();

    Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue);
}

bool needsProcessing(const InnerArr& a, const InnerArr& b) {
    static_assert(sizeof(a) == sizeof(b) && (sizeof(a) % 16) == 0, "Array size must be multiple of 16 bytes.");
    static const __m128i mmLimit = _mm_set1_epi16(LIMIT);
    static const __m128i mmLimitPlus1 = _mm_set1_epi16(LIMIT + 1);
    static const __m128i mmOnes = _mm_set1_epi16(-1);

    auto to_m128i = [](const uint16_t* p) { return reinterpret_cast<const __m128i*>(p); };
    return equal(to_m128i(a.data()), to_m128i(a.data() + a.size()), to_m128i(b.data()), [&](const __m128i& a, const __m128i& b) {
        // avoid overflow due to signed compare by clamping sum to LIMIT + 1
        const __m128i clampSum = _mm_min_epu16(_mm_adds_epu16(a, b), mmLimitPlus1);
        return _mm_test_all_zeros(_mm_cmpgt_epi16(clampSum, mmLimit), mmOnes);
    });
}

void TestArrayParallelVersion() {
    cout << __FUNCTION__ << endl;

    engine.seed();
    const auto v1 = generateRandomInput<InnerArr>();
    const auto v2 = generateRandomInput<InnerArr>();

    combinable<int> passedFilterCombinable;
    combinable<uint32_t> specialValueCombinable;

    auto startTime = chrono::high_resolution_clock::now();

    const size_t blockSize = 64;

    parallel_for(0u, v1.size(), blockSize, [&](size_t i) {
        for (const auto& b : v2) {
            const auto blockBegin = begin(v1) + i;
            const auto blockEnd = begin(v1) + min(v1.size(), i + blockSize);
            for (auto it = blockBegin; it != blockEnd; ++it) {
                const InnerArr& a = *it;
                if (!needsProcessing(a, b))
                    continue;

                // Further processing involving a and b
                ++passedFilterCombinable.local();
                specialValueCombinable.local() += inner_product(begin(a), end(a), begin(b), 0);
            }
        }
    });

    auto passedFilter = passedFilterCombinable.combine(plus<int>());
    auto specialValue = specialValueCombinable.combine(plus<uint32_t>());

    auto endTime = chrono::high_resolution_clock::now();

    Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue);
}

int main() {
    TestOriginalVersion();
    TestArrayParallelVersion();
}

在我的 8 核系统上,我看到了相当不错的加速,你的结果会根据你拥有的核心数量等而有所不同。

TestOriginalVersion
441579/400000000 (0.110395%) passed filter
2447300015
Elapsed time = 12525ms
TestArrayParallelVersion
441579/400000000 (0.110395%) passed filter
2447300015
Elapsed time = 657ms
于 2013-10-08T16:55:28.810 回答