这确实应该是对 Mats Petersson 的回答的评论,但我想提供代码示例。
问题是特定资源的争用以及缓存行。
备选方案 1:
#include <cstdint>
#include <thread>
#include <vector>
#include <stdlib.h>
static const uint64_t ITERATIONS = 10000000000ULL;
int main(int argc, const char** argv)
{
size_t numThreads = 1;
if (argc > 1) {
numThreads = strtoul(argv[1], NULL, 10);
if (numThreads == 0)
return -1;
}
std::vector<std::thread> threads;
uint64_t k = 0;
for (size_t t = 0; t < numThreads; ++t) {
threads.emplace_back([&k]() { // capture k by reference so we all use the same k.
while (k < ITERATIONS) {
k++;
}
});
}
for (size_t t = 0; t < numThreads; ++t) {
threads[t].join();
}
return 0;
}
在这里,线程争用一个变量,同时执行读取和写入,这迫使它乒乓球导致争用并使单线程情况最有效。
#include <cstdint>
#include <thread>
#include <vector>
#include <stdlib.h>
#include <atomic>
static const uint64_t ITERATIONS = 10000000000ULL;
int main(int argc, const char** argv)
{
size_t numThreads = 1;
if (argc > 1) {
numThreads = strtoul(argv[1], NULL, 10);
if (numThreads == 0)
return -1;
}
std::vector<std::thread> threads;
std::atomic<uint64_t> k = 0;
for (size_t t = 0; t < numThreads; ++t) {
threads.emplace_back([&]() {
// Imperfect division of labor, we'll fall short in some cases.
for (size_t i = 0; i < ITERATIONS / numThreads; ++i) {
k++;
}
});
}
for (size_t t = 0; t < numThreads; ++t) {
threads[t].join();
}
return 0;
}
在这里,我们确定性地分工(我们遇到了 numThreads 不是 ITERATIONS 的除数但对于这个演示来说足够接近的情况)。不幸的是,我们仍然遇到访问内存中共享元素的争用。
#include <cstdint>
#include <thread>
#include <vector>
#include <stdlib.h>
#include <atomic>
static const uint64_t ITERATIONS = 10000000000ULL;
int main(int argc, const char** argv)
{
size_t numThreads = 1;
if (argc > 1) {
numThreads = strtoul(argv[1], NULL, 10);
if (numThreads == 0)
return -1;
}
std::vector<std::thread> threads;
std::vector<uint64_t> ks;
for (size_t t = 0; t < numThreads; ++t) {
threads.emplace_back([=, &ks]() {
auto& k = ks[t];
// Imperfect division of labor, we'll fall short in some cases.
for (size_t i = 0; i < ITERATIONS / numThreads; ++i) {
k++;
}
});
}
uint64_t k = 0;
for (size_t t = 0; t < numThreads; ++t) {
threads[t].join();
k += ks[t];
}
return 0;
}
同样,这对于工作负载的分配是确定性的,最后我们会花费少量精力来整理结果。然而,我们没有采取任何措施来确保计数器的分布有利于 CPU 的健康分布。为了那个原因:
#include <cstdint>
#include <thread>
#include <vector>
#include <stdlib.h>
static const uint64_t ITERATIONS = 10000000000ULL;
#define CACHE_LINE_SIZE 128
int main(int argc, const char** argv)
{
size_t numThreads = 1;
if (argc > 1) {
numThreads = strtoul(argv[1], NULL, 10);
if (numThreads == 0)
return -1;
}
std::vector<std::thread> threads;
std::mutex kMutex;
uint64_t k = 0;
for (size_t t = 0; t < numThreads; ++t) {
threads.emplace_back([=, &k]() {
alignas(CACHE_LINE_SIZE) uint64_t myK = 0;
// Imperfect division of labor, we'll fall short in some cases.
for (uint64_t i = 0; i < ITERATIONS / numThreads; ++i) {
myK++;
}
kMutex.lock();
k += myK;
kMutex.unlock();
});
}
for (size_t t = 0; t < numThreads; ++t) {
threads[t].join();
}
return 0;
}
在这里,我们避免线程之间的争用一直到缓存行级别,除了最后我们使用互斥锁来控制同步的单一情况。对于这种微不足道的工作量,互斥锁将具有非常高的相对成本。或者,您可以使用 alignas 在外部范围内为每个线程提供其自己的存储,并在连接后汇总结果,从而消除对互斥体的需要。我把它作为练习留给读者。