0

我正在为我的应用程序编写一个简单的线程池,我在双核处理器上进行了测试。通常它运行良好,但我注意到当其他进程使用超过 50% 的处理器时,我的应用程序几乎停止。这让我很好奇,所以我决定重现这种情况并创建辅助应用程序,它只是运行无限循环(没有多线程),占用 50% 的处理器。当辅助程序运行时,多线程应用程序几乎像以前一样停止(处理速度从每秒 300-400 个任务下降到每秒 5-10 个任务)。但是,当我将多线程程序的进程亲和性更改为仅使用一个内核(辅助仍然使用两者)时,它开始工作,当然最多使用剩余 50% 的处理器。当我在我的应用程序中禁用多线程(仍在处理相同的任务,但没有线程池)时,它就像魅力一样工作,没有任何辅助设备减速,它仍在运行(这就是两个应用程序在两个内核上运行时应该表现的方式)。但是当我启用多线程时,问题又回来了。

我制作了特殊的代码来测试这个特定的线程池:

标题

#ifndef THREADPOOL_H_
#define THREADPOOL_H_

typedef double FloatingPoint;

#include <queue>
#include <vector>

#include <mutex>
#include <atomic>
#include <condition_variable>
#include <thread>

using namespace std;

struct ThreadTask
{
    int size;

    ThreadTask(int s)
    {
        size = s;
    }
    ~ThreadTask()
    {
    }
};

class ThreadPool
{
protected:
    queue<ThreadTask*> tasks;
    vector<std::thread> threads;
    std::condition_variable task_ready;
    std::mutex variable_mutex;
    std::mutex max_mutex;

    std::atomic<FloatingPoint> max;
    std::atomic<int> sleeping;
    std::atomic<bool> running;

    int threads_count;

    ThreadTask * getTask();
    void runWorker();
    void processTask(ThreadTask*);
    bool isQueueEmpty();
    bool isTaskAvailable();
    void threadMethod();
    void createThreads();
    void waitForThreadsToSleep();
public:
    ThreadPool(int);
    virtual ~ThreadPool();

    void addTask(int);
    void start();
    FloatingPoint getValue();

    void reset();
    void clearTasks();
};

#endif /* THREADPOOL_H_ */

和.cpp

#include "stdafx.h"
#include <climits>
#include <float.h>

#include "ThreadPool.h"

ThreadPool::ThreadPool(int t)
{
    running = true;
    threads_count = t;
    max = FLT_MIN;
    sleeping = 0;

    if(threads_count < 2)                                       //one worker thread has no sense
    {
        threads_count = (int)thread::hardware_concurrency();    //default value

        if(threads_count == 0)  //in case it fails ('If this value is not computable or well defined, the function returns 0')
            threads_count = 2;
    }

    printf("%d worker threads\n", threads_count);
}

ThreadPool::~ThreadPool()
{
    running = false;

    reset();                    //it will make sure that all worker threads are sleeping on condition variable
    task_ready.notify_all();    //let them finish in natural way

    for (auto& th : threads)
        th.join();
}

void ThreadPool::start()
{
    createThreads();
}

FloatingPoint ThreadPool::getValue()
{
    waitForThreadsToSleep();

    return max;
}

void ThreadPool::createThreads()
{
    threads.clear();

    for(int i = 0; i < threads_count; ++i)
        threads.push_back(std::thread(&ThreadPool::threadMethod, this));
}

void ThreadPool::threadMethod()
{
    while(running)
        runWorker();
}

void ThreadPool::runWorker()
{
    ThreadTask * task = getTask();
    processTask(task);
}

void ThreadPool::processTask(ThreadTask * task)
{
    if(task == NULL)
        return;

    //do something to simulate processing

    vector<int> v;

    for(int i = 0; i < task->size; ++i)
        v.push_back(i);

    delete task;
}

void ThreadPool::addTask(int s)
{
    ThreadTask * task = new ThreadTask(s);

    std::lock_guard<std::mutex> lock(variable_mutex);
    tasks.push(task);

    task_ready.notify_one();
}

ThreadTask * ThreadPool::getTask()
{
    std::unique_lock<std::mutex> lck(variable_mutex);

    if(tasks.empty())
    {
        ++sleeping;
        task_ready.wait(lck);
        --sleeping;
        if(tasks.empty())   //in case of ThreadPool being deleted (destructor calls notify_all), or spurious notifications
            return NULL;    //return to main loop and repeat it
    }

    ThreadTask * task = tasks.front();
    tasks.pop();

    return task;
}

bool ThreadPool::isQueueEmpty()
{
    std::lock_guard<std::mutex> lock(variable_mutex);

    return tasks.empty();
}

bool ThreadPool::isTaskAvailable()
{
    return !isQueueEmpty();
}

void ThreadPool::waitForThreadsToSleep()
{
    while(isTaskAvailable())
        std::this_thread::yield();  //wait for all tasks to be taken
    while(true) //wait for all threads to finish they last tasks
    {
        if(sleeping == threads_count)
            break;

        std::this_thread::yield();
    }
}

void ThreadPool::clearTasks()
{
    std::unique_lock<std::mutex> lock(variable_mutex);

    while(!tasks.empty()) tasks.pop();
}

void ThreadPool::reset()    //don't call this when var_mutex is already locked by this thread!
{
    clearTasks();

    waitForThreadsToSleep();

    max = FLT_MIN;
}

它是如何测试的:

ThreadPool tp(2);
tp.start();

int iterations = 1000;
int task_size = 1000;

for(int j = 0; j < iterations; ++j)
{
    printf("\r%d left", iterations - j);

    tp.reset();
    for(int i = 0; i < 1000; ++i)
        tp.addTask(task_size);

    tp.getValue();  
}


return 0;

我已经在 Win7 64 上使用 mingw 和 gcc 4.8.1(从这里)和 Visual Studio 2012 (VC11) 构建了这段代码,都在调试配置上。

使用上述编译器构建的两个程序的行为完全不同。

a) 使用 mingw 构建的程序比在 VS 上构建的程序运行得快得多,因为它可以占用整个处理器(系统显示此过程几乎 100% 的 CPU 使用率,所以我不认为 mingw 偷偷将亲和力设置为一个核心)。但是当我运行辅助程序(使用 50% 的 CPU)时,它会大大减慢(大约几十倍)。在这种情况下,主程序和辅助程序的 CPU 使用率约为 50%-50%。

b) 使用 VS 2012 构建的程序,当使用整个 CPU 时,甚至比 a) 速度慢(当我设置 task_size = 1 时,它们的速度相似)。但是当辅助程序运行时,主程序甚至占用了大部分 CPU(使用率约为 66% 主程序 - 33% 辅助程序),因此导致的减速几乎不明显。

当设置为仅使用一个核心时,两个程序都明显加快了速度(大约 1.5 - 2 倍),并且 mingw 一个不再容易受到竞争的影响。

嗯,现在我不知道该怎么办。我的程序在由两个不同的工具集构建时表现不同。这是我的代码中的一个缺陷(假设是真的),还是与 c++11 有问题的编译器有关?

4

0 回答 0