我正在为我的应用程序编写一个简单的线程池,我在双核处理器上进行了测试。通常它运行良好,但我注意到当其他进程使用超过 50% 的处理器时,我的应用程序几乎停止。这让我很好奇,所以我决定重现这种情况并创建辅助应用程序,它只是运行无限循环(没有多线程),占用 50% 的处理器。当辅助程序运行时,多线程应用程序几乎像以前一样停止(处理速度从每秒 300-400 个任务下降到每秒 5-10 个任务)。但是,当我将多线程程序的进程亲和性更改为仅使用一个内核(辅助仍然使用两者)时,它开始工作,当然最多使用剩余 50% 的处理器。当我在我的应用程序中禁用多线程(仍在处理相同的任务,但没有线程池)时,它就像魅力一样工作,没有任何辅助设备减速,它仍在运行(这就是两个应用程序在两个内核上运行时应该表现的方式)。但是当我启用多线程时,问题又回来了。
我制作了特殊的代码来测试这个特定的线程池:
标题
#ifndef THREADPOOL_H_
#define THREADPOOL_H_
typedef double FloatingPoint;
#include <queue>
#include <vector>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <thread>
using namespace std;
struct ThreadTask
{
int size;
ThreadTask(int s)
{
size = s;
}
~ThreadTask()
{
}
};
class ThreadPool
{
protected:
queue<ThreadTask*> tasks;
vector<std::thread> threads;
std::condition_variable task_ready;
std::mutex variable_mutex;
std::mutex max_mutex;
std::atomic<FloatingPoint> max;
std::atomic<int> sleeping;
std::atomic<bool> running;
int threads_count;
ThreadTask * getTask();
void runWorker();
void processTask(ThreadTask*);
bool isQueueEmpty();
bool isTaskAvailable();
void threadMethod();
void createThreads();
void waitForThreadsToSleep();
public:
ThreadPool(int);
virtual ~ThreadPool();
void addTask(int);
void start();
FloatingPoint getValue();
void reset();
void clearTasks();
};
#endif /* THREADPOOL_H_ */
和.cpp
#include "stdafx.h"
#include <climits>
#include <float.h>
#include "ThreadPool.h"
ThreadPool::ThreadPool(int t)
{
running = true;
threads_count = t;
max = FLT_MIN;
sleeping = 0;
if(threads_count < 2) //one worker thread has no sense
{
threads_count = (int)thread::hardware_concurrency(); //default value
if(threads_count == 0) //in case it fails ('If this value is not computable or well defined, the function returns 0')
threads_count = 2;
}
printf("%d worker threads\n", threads_count);
}
ThreadPool::~ThreadPool()
{
running = false;
reset(); //it will make sure that all worker threads are sleeping on condition variable
task_ready.notify_all(); //let them finish in natural way
for (auto& th : threads)
th.join();
}
void ThreadPool::start()
{
createThreads();
}
FloatingPoint ThreadPool::getValue()
{
waitForThreadsToSleep();
return max;
}
void ThreadPool::createThreads()
{
threads.clear();
for(int i = 0; i < threads_count; ++i)
threads.push_back(std::thread(&ThreadPool::threadMethod, this));
}
void ThreadPool::threadMethod()
{
while(running)
runWorker();
}
void ThreadPool::runWorker()
{
ThreadTask * task = getTask();
processTask(task);
}
void ThreadPool::processTask(ThreadTask * task)
{
if(task == NULL)
return;
//do something to simulate processing
vector<int> v;
for(int i = 0; i < task->size; ++i)
v.push_back(i);
delete task;
}
void ThreadPool::addTask(int s)
{
ThreadTask * task = new ThreadTask(s);
std::lock_guard<std::mutex> lock(variable_mutex);
tasks.push(task);
task_ready.notify_one();
}
ThreadTask * ThreadPool::getTask()
{
std::unique_lock<std::mutex> lck(variable_mutex);
if(tasks.empty())
{
++sleeping;
task_ready.wait(lck);
--sleeping;
if(tasks.empty()) //in case of ThreadPool being deleted (destructor calls notify_all), or spurious notifications
return NULL; //return to main loop and repeat it
}
ThreadTask * task = tasks.front();
tasks.pop();
return task;
}
bool ThreadPool::isQueueEmpty()
{
std::lock_guard<std::mutex> lock(variable_mutex);
return tasks.empty();
}
bool ThreadPool::isTaskAvailable()
{
return !isQueueEmpty();
}
void ThreadPool::waitForThreadsToSleep()
{
while(isTaskAvailable())
std::this_thread::yield(); //wait for all tasks to be taken
while(true) //wait for all threads to finish they last tasks
{
if(sleeping == threads_count)
break;
std::this_thread::yield();
}
}
void ThreadPool::clearTasks()
{
std::unique_lock<std::mutex> lock(variable_mutex);
while(!tasks.empty()) tasks.pop();
}
void ThreadPool::reset() //don't call this when var_mutex is already locked by this thread!
{
clearTasks();
waitForThreadsToSleep();
max = FLT_MIN;
}
它是如何测试的:
ThreadPool tp(2);
tp.start();
int iterations = 1000;
int task_size = 1000;
for(int j = 0; j < iterations; ++j)
{
printf("\r%d left", iterations - j);
tp.reset();
for(int i = 0; i < 1000; ++i)
tp.addTask(task_size);
tp.getValue();
}
return 0;
我已经在 Win7 64 上使用 mingw 和 gcc 4.8.1(从这里)和 Visual Studio 2012 (VC11) 构建了这段代码,都在调试配置上。
使用上述编译器构建的两个程序的行为完全不同。
a) 使用 mingw 构建的程序比在 VS 上构建的程序运行得快得多,因为它可以占用整个处理器(系统显示此过程几乎 100% 的 CPU 使用率,所以我不认为 mingw 偷偷将亲和力设置为一个核心)。但是当我运行辅助程序(使用 50% 的 CPU)时,它会大大减慢(大约几十倍)。在这种情况下,主程序和辅助程序的 CPU 使用率约为 50%-50%。
b) 使用 VS 2012 构建的程序,当使用整个 CPU 时,甚至比 a) 速度慢(当我设置 task_size = 1 时,它们的速度相似)。但是当辅助程序运行时,主程序甚至占用了大部分 CPU(使用率约为 66% 主程序 - 33% 辅助程序),因此导致的减速几乎不明显。
当设置为仅使用一个核心时,两个程序都明显加快了速度(大约 1.5 - 2 倍),并且 mingw 一个不再容易受到竞争的影响。
嗯,现在我不知道该怎么办。我的程序在由两个不同的工具集构建时表现不同。这是我的代码中的一个缺陷(假设是真的),还是与 c++11 有问题的编译器有关?