1

我有以下代码,它从一开始就启动多个线程(一个线程池)(startWorkers())。随后,在某个时候,我有一个装满myWorkObject实例的容器,我想同时使用多个工作线程对其进行处理。就内存使用而言,myWorkObject它们与另一个完全隔离。现在让我们假设 myWorkObject 有一个doWorkIntenseStuffHere()需要一些 cpu 时间来计算的方法。

在对以下代码进行基准测试时,我注意到此代码不能很好地随线程数扩展,并且初始化/同步工作线程的开销超过了多线程的好处,除非有 3-4 个线程处于活动状态。我已经研究了这个问题并阅读了关于错误共享问题的信息,并且我认为我的代码遇到了这个问题。但是,我想调试/分析我的代码以查看是否存在某种饥饿/错误共享。我怎样才能做到这一点?请随时批评我的代码,因为我仍在学习很多关于内存/cpu 和多线程的知识。

#include <boost/thread.hpp>

class MultiThreadedFitnessProcessingStrategy
{
public:
    MultiThreadedFitnessProcessingStrategy(unsigned int numWorkerThreads):
        _startBarrier(numWorkerThreads + 1),
        _endBarrier(numWorkerThreads + 1),
        _started(false),
        _shutdown(false),
        _numWorkerThreads(numWorkerThreads)
    {
        assert(_numWorkerThreads > 0);
    }


    virtual ~MultiThreadedFitnessProcessingStrategy()
    {
        stopWorkers();
    }


void startWorkers()
{
    _shutdown = false;
    _started = true;

    for(unsigned int i = 0; i < _numWorkerThreads;i++)
    {
        boost::thread*  workerThread = new boost::thread(
                boost::bind(&MultiThreadedFitnessProcessingStrategy::workerTask, this,i)
        );
        _threadQueue.push_back(new std::queue<myWorkObject::ptr>());
        _workerThreads.push_back(workerThread);
    }
}


void stopWorkers()
{
    _startBarrier.wait();
    _shutdown = true;
    _endBarrier.wait();

    for(unsigned int i = 0; i < _numWorkerThreads;i++)
    {
        _workerThreads[i]->join();
    }

}

void workerTask(unsigned int id)
{

    //Wait until all worker threads have started.
    while(true)
    {
        //Wait for any input to become available.
        _startBarrier.wait();

        bool queueEmpty = false;
        std::queue<SomeClass::ptr >* myThreadq(_threadQueue[id]);

        while(!queueEmpty)
        {

            SomeClass::ptr myWorkObject;

            //Make sure queue is not empty,
            //Caution: this is necessary if start barrier was triggered without queue input (e.g., shutdown) , which can happen.
            //Do not try to be smart and refactor this without knowing what you are doing!
            queueEmpty = myThreadq->empty();


            if(!queueEmpty)
            {
                chromosome = myThreadq->front();
                assert(myWorkObject);
                myThreadq->pop();
            }

            if(myWorkObject)
            {
                myWorkObject->doWorkIntenseStuffHere();
            }
        }

        //Wait until all worker threads have synchronized.
        _endBarrier.wait();

        if(_shutdown)
        {
            return;
        }
    }
}


void doWork(const myWorkObject::chromosome_container &refcontainer)
{

    if(!_started)
    {
        startWorkers();
    }

    unsigned int j = 0;
    for(myWorkObject::chromosome_container::const_iterator it = refcontainer.begin();
            it != refcontainer.end();++it)
    {
        if(!(*it)->hasFitness())
        {
            assert(*it);
            _threadQueue[j%_numWorkerThreads]->push(*it);
            j++;
        }
    }

    //Start Signal!
    _startBarrier.wait();

    //Wait for workers to be complete
    _endBarrier.wait();

}


    unsigned int getNumWorkerThreads() const
    {
        return _numWorkerThreads;
    }

    bool isStarted() const
    {
        return _started;
    }


private:

    boost::barrier _startBarrier;
    boost::barrier _endBarrier;

    bool _started;
    bool _shutdown;

    unsigned int _numWorkerThreads;

    std::vector<boost::thread*> _workerThreads;

    std::vector< std::queue<myWorkObject::ptr >* > _threadQueue;


};
4

2 回答 2

1

如果你在 Linux 上,有一个名为 valgrind 的工具,其中一个模块进行缓存效果模拟 (cachegrind)。请看一下

http://valgrind.org/docs/manual/cg-manual.html

于 2015-03-16T16:28:54.680 回答
1

Sampling-based profiling can give you a pretty good idea whether you're experiencing false sharing. Here's a previous thread that describes a few ways to approach the issue. I don't think that thread mentioned Linux's perf utility. It's a quick, easy and free way to count cache misses that might tell you what you need to know (am I experiencing a significant number of cache misses that correlates with how many times I'm accessing a particular variable?).

If you do find that your threading scheme might be causing a lot of conflict misses, you could try declaring your myWorkObject instances or the data contained within them that you're actually concerned about with __attribute__((aligned(64))) (alignment to 64 byte cache lines).

于 2015-03-16T15:49:32.850 回答