我认为使用多线程处理简单而繁重的工作(例如矩阵计算)比使用单线程更好,因此我测试了以下代码:
int main()
{
constexpr int N = 100000;
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_real_distribution<double> ini(0.0, 10.0);
// single-thread
{
std::vector<int> vec(N);
for(int i = 0; i < N; ++i)
{
vec[i] = ini(mt);
}
auto start = std::chrono::system_clock::now();
for(int i = 0; i < N; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
auto end = std::chrono::system_clock::now();
auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "single : " << dur << " ms."<< std::endl;
}
// multi-threading (Th is the number of threads)
for(int Th : {1, 2, 4, 8, 16})
{
std::vector<int> vec(N);
for(int i = 0; i < N; ++i)
{
vec[i] = ini(mt);
}
auto start = std::chrono::system_clock::now();
std::vector<std::future<void>> fut(Th);
for(int t = 0; t < Th; ++t)
{
fut[t] = std::async(std::launch::async, [t, &vec, &N, &Th]{
for(int i = t*N / Th; i < (t + 1)*N / Th; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
});
}
for(int t = 0; t < Th; ++t)
{
fut[t].get();
}
auto end = std::chrono::system_clock::now();
auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "Th = " << Th << " : " << dur << " ms." << std::endl;
}
return 0;
}
执行环境:
OS : Windows 10 64-bit
Build-system : Visual Studio Community 2015
CPU : Core i5 4210U
在调试模式下构建这个程序时,结果如我所料:
single : 146 ms.
Th = 1 : 140 ms.
Th = 2 : 71 ms.
Th = 4 : 64 ms.
Th = 8 : 61 ms.
Th = 16 : 68 ms.
这表示不使用 std::async 的代码与使用单线程的代码具有相同的性能,并且当使用 4 或 8 个线程时,我可以获得出色的性能。
但是,在发布模式下,我得到了不同的结果 (N : 100000 -> 100000000):
single : 54 ms.
Th = 1 : 443 ms.
Th = 2 : 285 ms.
Th = 4 : 205 ms.
Th = 8 : 206 ms.
Th = 16 : 221 ms.
我想知道这个结果。只是对于后半部分代码,多线程的性能比单线程更好。但最快的是前半部分代码,它不使用 std::async。我知道多线程的优化和开销对性能有很大影响。然而,
- 该过程只是向量的计算,那么在多线程代码中而不是在单线程代码中可以优化什么?
- 该程序不包含任何关于互斥或原子等的内容,并且可能不会发生数据冲突。我认为多线程的开销会相对较小。
- 不使用 std::async 的代码中的 CPU 利用率小于多线程代码中的 CPU 利用率。使用大部分 CPU 是否有效?
更新:我试图研究矢量化。我启用/Qvec-report:1
了选项并得到了事实:
//vectorized (when N is large)
for(int i = 0; i < N; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
//not vectorized
auto lambda = [&vec, &N]{
for(int i = 0; i < N; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
};
lambda();
//not vectorized
std::vector<std::future<void>> fut(Th);
for(int t = 0; t < Th; ++t)
{
fut[t] = std::async(std::launch::async, [t, &vec, &N, Th]{
for(int i = t*N / Th; i < (t + 1)*N / Th; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
});
}
和运行时间:
single (with vectorization) : 47 ms.
single (without vectorization) : 70 ms.
可以肯定的是,for-loop 在多线程版本中没有向量化。但是,由于任何其他原因,该版本也需要很多时间。
更新 2:我在 lambda 中重写了 for 循环(A 型到 B 型):
//Type A (the previous one)
fut[t] = std::async(std::launch::async, [t, &vec, &N, Th]{
for(int i = t*N / Th; i < (t + 1)*N / Th; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
});
//Type B (the new one)
fut[t] = std::async(std::launch::async, [t, &vec, &N, Th]{
int nb = t * N / Th;
int ne = (t + 1) * N / Th;
for(int i = nb; i < ne; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
});
B型运行良好。结果 :
single (vectorized) : 44 ms.
single (invectorized) : 77 ms.
--
Th = 1 (Type A) : 435 ms.
Th = 2 (Type A) : 278 ms.
Th = 4 (Type A) : 219 ms.
Th = 8 (Type A) : 212 ms.
--
Th = 1 (Type B) : 112 ms.
Th = 2 (Type B) : 74 ms.
Th = 4 (Type B) : 60 ms.
Th = 8 (Type B) : 61 ms.
类型 B 的结果是可以理解的(多线程代码会比单线程向量化代码运行得更快,并且不如向量化代码快)。另一方面,A 型似乎等同于 B 型(仅使用临时变量),但它们显示出不同的性能。这两种类型可以认为是产生了不同的汇编代码。
更新 3:我可能会发现一个减慢多线程 for 循环的因素。是条件下的除法for
。这是单线程测试:
//ver 1 (ordinary)
fut[t] = std::async(std::launch::async, [&vec, &N]{
for(int i = 0; i < N; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
});
//ver 2 (introducing a futile variable Q)
int Q = 1;
fut[t] = std::async(std::launch::async, [&vec, &N, Q]{
for(int i = 0; i < N / Q; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
});
//ver 3 (using a temporary variable)
int Q = 1;
fut[t] = std::async(std::launch::async, [&vec, &N, Q]{
int end = N / Q;
for(int i = 0; i < end; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
});
//ver 4 (using a raw value)
fut[t] = std::async(std::launch::async, [&vec]{
for(int i = 0; i < 100000000; ++i)
{
vec[i] = 2 * vec[i] + 3 * vec[i] - vec[i];
}
});
和运行时间:
ver 1 : 132 ms.
ver 2 : 391 ms.
ver 3 : 47 ms.
ver 4 : 43 ms.
版本 3 和 4得到了很好的优化,版本 1没有那么多,因为我认为编译器无法捕捉到 N 是不变的,尽管 N 是constexpr
. 由于同样的原因,我认为第 2 版非常慢。编译器不明白 N 和 Q 不会变化。所以这个条件i < N / Q
需要大量的汇编代码,这会减慢 for 循环。