9

triangle我想对应用在对象上的展开循环和 for 循环之间的执行速度差异进行基准测试。整个示例可在此处获得。

这是完整的代码:

#include <iostream>
#include <vector>
#include <array>
#include <random>
#include <functional>
#include <chrono>
#include <fstream>

template<typename RT>
class Point 
{
    std::array<RT,3> data; 

    public: 

        Point() = default;

        Point(std::initializer_list<RT>& ilist)
            :
                data(ilist)
        {}

        Point(RT x, RT y, RT z)
            :
                data({x,y,z})
        {};

        RT& operator[](int i)
        {
            return data[i];  
        }

        RT operator[](int i) const
        {
            return data[i];
        }

        const Point& operator += (Point const& other)
        {
            data[0] += other.data[0];
            data[1] += other.data[1];
            data[2] += other.data[2];

            return *this; 
        }

        const Point& operator /= (RT const& s)
        {
            data[0] /= s; 
            data[1] /= s;  
            data[2] /= s;  

            return *this;
        }

};

template<typename RT>
Point<RT> operator-(const Point<RT>& p1, const Point<RT>& p2)
{
    return Point<RT>(p1[0] - p2[0], p1[1] - p2[1], p1[2] - p2[2]);
}

template<typename RT>
std::ostream& operator<<(std::ostream& os , Point<RT> const& p)
{
    os << p[0] << " " << p[1] << " " << p[2]; 
    return os;
}

template<typename Point>
class Triangle 
{
    std::array<Point, 3> points; 

    public: 

        typedef typename std::array<Point, 3>::value_type value_type;

        typedef Point PointType; 

        Triangle() = default; 

        Triangle(std::initializer_list<Point>& ilist) 
            :
                points(ilist)
        {}

        Triangle(Point const& p1, Point const& p2, Point const& p3)
            :
                points({p1, p2, p3})
        {}

        Point& operator[](int i)
        {
            return points[i]; 
        }

        Point operator[](int i) const
        {
            return points[i]; 
        }

        auto begin()
        {
            return points.begin(); 
        }

        const auto begin() const
        {
            return points.begin(); 
        }

        auto end()
        {
            return points.end(); 
        }

        const auto end() const
        {
            return points.end(); 
        }
};

template<typename Triangle>
typename Triangle::PointType barycenter_for(Triangle const& triangle)
{
    typename Triangle::value_type barycenter; 

    for (const auto& point : triangle)
    {
        barycenter += point; 
    }

    barycenter /= 3.; 

    return barycenter; 
}

template<typename Triangle>
typename Triangle::PointType barycenter_unrolled(Triangle const& triangle)
{
    typename Triangle::PointType barycenter; 

    barycenter += triangle[0]; 
    barycenter += triangle[1]; 
    barycenter += triangle[2]; 

    barycenter /= 3.; 

    return barycenter; 
}

template<typename TriangleSequence>
typename TriangleSequence::value_type::value_type
barycenter(
    TriangleSequence const& triangles, 
    std::function
    <
        typename TriangleSequence::value_type::value_type (
            typename TriangleSequence::value_type const &
         )
    > triangle_barycenter 
)
{
    typename TriangleSequence::value_type::value_type barycenter; 

    for(const auto& triangle : triangles)
    {
        barycenter += triangle_barycenter(triangle); 
    }

    barycenter /= double(triangles.size()); 

    return barycenter; 
}

using namespace std;

int main(int argc, const char *argv[])
{
    typedef Point<double> point; 
    typedef Triangle<point> triangle; 

    const int EXP = (atoi(argv[1]));

    ofstream outFile; 
    outFile.open("results.dat",std::ios_base::app); 

    const unsigned int MAX_TRIANGLES = pow(10, EXP);

    typedef std::vector<triangle> triangleVector; 

    triangleVector triangles;

    std::random_device rd;
    std::default_random_engine e(rd());
    std::uniform_real_distribution<double> dist(-10,10); 

    for (unsigned int i = 0; i < MAX_TRIANGLES; ++i)
    {
        triangles.push_back(
            triangle(
                point(dist(e), dist(e), dist(e)),
                point(dist(e), dist(e), dist(e)),
                point(dist(e), dist(e), dist(e))
            )
        );
    }

    typedef std::chrono::high_resolution_clock Clock; 

    auto start = Clock::now();
    auto trianglesBarycenter = barycenter(triangles, [](const triangle& tri){return barycenter_for(tri);});
    auto end = Clock::now(); 

    auto forLoop = end - start; 

    start = Clock::now();
    auto trianglesBarycenterUnrolled = barycenter(triangles, [](const triangle& tri){return barycenter_unrolled(tri);});
    end = Clock::now(); 

    auto unrolledLoop = end - start; 

    cout << "Barycenter difference (should be a zero vector): " << trianglesBarycenter - trianglesBarycenterUnrolled << endl;

    outFile << MAX_TRIANGLES << " " << forLoop.count() << " " << unrolledLoop.count() << "\n"; 

    outFile.close();

    return 0;
}

该示例由 Point 类型和 Triangle 类型组成。基准计算是三角形重心的计算。可以通过 for 循环来完成:

for (const auto& point : triangle)
{
    barycenter += point; 
}

barycenter /= 3.; 

return barycenter; 

或者它可以展开,因为三角形具有三个点:

barycenter += triangle[0]; 
barycenter += triangle[1]; 
barycenter += triangle[2]; 

barycenter /= 3.; 

return barycenter; 

所以我想测试对于一组三角形来说,计算重心的函数会更快。为了充分利用测试,我通过执行带有整数指数参数的主程序来使正在操作的三角形的数量可变:

./main 6

产生 10^6 个三角形。三角形的数量范围从 100 到 1e06。主程序创建“results.dat”文件。为了分析结果,我编写了一个小的 Python 脚本:

#!/usr/bin/python

from matplotlib import pyplot as plt
import numpy as np
import os

results = np.loadtxt("results.dat")

fig = plt.figure()

ax1 = fig.add_subplot(111)
ax2 = ax1.twinx()

ax1.loglog(); 
ax2.loglog();

ratio = results[:,1] / results[:,2]

print("Speedup factors unrolled loop / for loop: ")
print(ratio)

l1 = ax1.plot(results[:,0], results[:,1], label="for loop", color='red')
l2 = ax1.plot(results[:,0], results[:,2], label="unrolled loop", color='black')
l3 = ax2.plot(results[:,0], ratio, label="speedup ratio", color='green')

lines  = [l1, l2, l3]; 

ax1.set_ylabel("CPU count")
ax2.set_ylabel("relative speedup: unrolled loop / for loop")

ax1.legend(loc="center right")
ax2.legend(loc="center left")

plt.savefig("results.png")

并利用您计算机上的所有内容,复制示例代码,编译它:

g++ -std=c++1y -O3 -Wall -pedantic -pthread main.cpp -o main

要绘制不同重心函数的测量 CPU 时间,请执行 python 脚本(我称之为plotResults.py):

 for i in {1..6}; do ./main $i; done
./plotResults.py

我期望看到的是展开循环和for循环之间的相对加速(for循环时间/展开循环时间)将随着三角形集的大小而增加。这个结论可以从一个逻辑得出:如果展开的循环比 for 循环快,则执行大量展开的循环应该比执行大量的 for 循环更快。这是由上述 python 脚本生成的结果图:

在此处输入图像描述

循环展开的影响很快就消失了。一旦我使用超过 100 个三角形,似乎没有区别。查看 python 脚本计算的加速比:

[ 3.13502399  2.40828402  1.15045831  1.0197221   1.1042312   1.26175165
  0.99736715]

使用 100 个三角形(列表中的 3d 位置对应于 10^2)时的加速比为 1.15。

我来这里是为了找出我做错了什么,因为这里一定有问题,恕我直言。:) 提前致谢。

编辑:绘制 cachegrind 缓存未命中率

如果程序是这样运行的:

for input in {2..6}; do valgrind --tool=cachegrind  ./main $input; done

cachegrind生成一堆输出文件,可以用grepfor解析PROGRAM TOTALS,代表以下数据的数字列表,取自cachegrind 手册

Cachegrind 收集以下统计信息(用于每个统计信息的缩写在括号中给出):

I cache reads (Ir, which equals the number of instructions executed), I1 cache read misses (I1mr) and LL cache instruction read

未命中 (ILmr)。

D cache reads (Dr, which equals the number of memory reads), D1 cache read misses (D1mr), and LL cache data read misses (DLmr).

D cache writes (Dw, which equals the number of memory writes), D1 cache write misses (D1mw), and LL cache data write misses (DLmw).

Conditional branches executed (Bc) and conditional branches mispredicted (Bcm).

Indirect branches executed (Bi) and indirect branches mispredicted (Bim).

并且“组合”缓存未命中率定义为:(ILmr + DLmr + DLmw) / (Ir + Dr + Dw)

输出文件可以这样解析:

for file in cache*; do cg_annotate $file | grep TOTALS >> program-totals.dat; done
sed -i 's/PROGRAM TOTALS//'g program-totals.dat

然后可以使用以下 python 脚本可视化生成的数据:

#!/usr/bin/python
from matplotlib import pyplot as plt
import numpy as np
import os
import locale

totalInput = [totalInput.strip().split(' ') for totalInput in open('program-totals.dat','r')]

locale.setlocale(locale.LC_ALL, 'en_US.UTF-8' ) 

totals = []

for line in totalInput:
    totals.append([locale.atoi(item) for item in line])

totals = np.array(totals)

# Assumed default output format
# Ir I1mr  ILmr Dr Dmr DLmr Dw D1mw DLmw
# 0   1     2   3   4   5   6   7    8
cacheMissRatios = (totals[:,2] + totals[:,5] + totals[:,8]) / (totals[:,0] + totals[:,3] + totals[:,6])

fig = plt.figure()
ax1 = fig.add_subplot(111)
ax1.loglog()

results = np.loadtxt("results.dat")
l1 = ax1.plot(results[:,0], cacheMissRatios, label="Cachegrind combined cache miss ratio", color='black', marker='x')
l1 = ax1.plot(results[:,0], results[:,1] / results[:,2], label="Execution speedup", color='blue', marker='o')

ax1.set_ylabel("Cachegrind combined cache miss ratio")
ax1.set_xlabel("Number of triangles")
ax1.legend(loc="center left")

plt.savefig("cacheMisses.png")

因此,在展开三角形访问循环时,绘制组合的 LL 未命中率与程序加速比,得到下图:

在此处输入图像描述

并且似乎对 LL 错误率存在依赖性:随着它的上升,程序的加速下降。但是,我仍然看不到瓶颈的明确原因。

综合 LL 未命中率是否适合分析?看看 valgrind 的输出,据报道所有的未命中率都低于 5%,这应该还不错吧?

4

3 回答 3

4

即使您展开,计算 barycenter也是一次完成的。此外,计算的每一步(对于单个重心)都依赖于前一步,这意味着它们不能并行化。您当然可以通过一次计算重心而不是一个重心来获得更好的吞吐量n,并对各种值进行基准测试n以确定哪些数量会使 CPU 管道饱和。

另一个可能有助于加快计算速度的方面是数据布局:与其将三角形点一起存储在一个结构中,您可以尝试将它们拆分为 3 个不同的数组(每个点一个),然后再次使用不同的值进行基准测试n

关于您的主要问题,除非代码转换降低了底层算法的复杂程度(这是完全可能的),否则所获得的速度最多应该与数据集大小成线性关系,但是如果数据集足够大,则很可能达到不同的限制(例如,当一级内存 - 缓存级别 1、级别 2、主内存 - 饱和时会发生什么?)。

于 2014-12-01T15:58:52.113 回答
1
  1. 对于小型数据集,您的第二个 BarycenterUnrolled 循环更快,因为数据集足够小,可以进行 L2/L3 缓存优化。尝试交换您在程序中运行测试的顺序。一个看似合乎逻辑的决定可能是将测试作为单独的进程运行,但这也并不总是有效:L2/L3 缓存是持久的。每个过程的后续运行可能会产生不同的结果。(请参阅下面的更多细节)

  2. 您在频谱上观察到的其余差异是噪声。在这两种情况下,您的 GCC 编译器都会生成几乎相同的代码。当指定 -O3 时,GCC 以积极展开循环(例如那个循环)而闻名。事实上,GCC 在某些情况下会展开循环多达 16 或 24 次迭代——这有时会损害某些移动芯片架构的性能。

此外,您可以使用 -fno-unroll-loops 进行测试......虽然我怀疑您会看到很多差异,因为您的算法的主要瓶颈是,按以下顺序:

  1. 除法运算 (/= 3)
  2. 记忆

关于在短数据集上运行适当的基准测试:

为了避免短数据集上的 L2/L3 缓存噪声,您需要在每次基准测试之前刷新缓存。这通常是通过在 ~16MB - 32MB 的堆中分配一些大块数据并对其进行读/写垃圾来完成的。在您的情况下,为每个测试构建完全不同的三角形列表也是可取的。

但最好的建议通常是:“不要在小数据集上运行基准测试”。相反,只在非常大的数据集上运行基准测试,然后在大数据集上也对小数据集使用最好的。这适用于微优化情况,例如循环展开或 cpu 指令计数。如果您正在执行更高级别的算法,例如排序或树遍历,并且知道您的主要用例将是小型数据集,那么应该使用一组不同的基准标准。在这些情况下,我更喜欢通过连接数十个小数据集来创建一个“大”数据集。这将强调算法中可能成为小型数据集瓶颈的部分,例如设置和结果处理。

于 2014-12-06T19:44:15.623 回答
0

循环展开可以节省循环的开销。如果执行循环所需的时间比执行循环的每个单次迭代的时间小,那么您将不会节省太多。

情况可能更糟。您的处理器有许多独立工作的单元。例如,您可能有一个内存单元、一个浮点单元和一个整数单元。您的代码将花费这些单元中最慢的时间。循环(递增索引,检查它是否足够小,从循环的开头开始)由整数单元执行。如果您的代码在内存单元中需要 100 毫秒,在浮点单元中需要 80 毫秒,在整数单元中需要 60 毫秒,那么它需要 100 毫秒。浮点或整数单位的任何节省都不会让它更快。

请注意,对于小示例,所有数据都适合缓存。因此内存单元将花费相对较少的时间。假设您有一个小样本,没有展开需要 60µs(内存)、60µs(浮点)和 80µs(整数)。现在循环展开可以帮助将总时间从 80µs 减少到 60µs。

于 2016-03-10T16:17:15.177 回答