1

我们正在开发一个项目来生成一个通用线程类,该类允许我们处理一组相互关联的数据。基本思想是在不同的线程中仅评估未连接且可以同时处理的数据集。我们开发了一个基于 boost::thread 的 ThreadClass 和一个基于 boost::mutex 的 OF_bmutex 类来执行日志操作。

代码方案在链接的pdf(http://cdm.unimore.it/dep/test.pd)中,而主要类的骨架如下......

// encapsulate boost::mutex to log...
class OF_bmutex{

public:

    std::string  mutex_type;
    int m_id;
    boost::mutex  m;

    void  lock(){ 
       std::cout << "Mutex " << mutex_type << m_id << " locking from " << boost::this_thread::get_id() << std::endl;       
       m.lock();       
       std::cout << "Mutex " << mutex_type << m_id << " locked from " <<  boost::this_thread::get_id()  << std::endl; 
    }

    void  unlock(){ 
       std::cout <<  "Mutex "  << mutex_type << m_id << " unlocking from " << boost::this_thread::get_id() << std::endl;
       m.unlock(); 
       std::cout << "Mutex " << mutex_type << m_id << " unlocked from " << boost::this_thread::get_id() << std::endl;
    }

    bool  try_lock(){ 
        std::cout <<  "Mutex " << mutex_type << m_id << " try locking from " <<  boost::this_thread::get_id() << std::endl;
        bool ret = m.try_lock();
        if( ret ){ 
            std::cout << "Mutex " << mutex_type << m_id << " try locked from " << boost::this_thread::get_id() << std::endl;
        }
        return(ret);
    }
 };




// My thread class
class OF_ThreadClass {
    private:
        //! running variable
        bool running;
        //! The thread executing this process...
        boost::thread *m_thread;
        //! The data to process...
        LinkedDataSet *my_data;
        //! The id of this thread
        int thread_id;
        //! Process the data...
        virtual int processData();
    public:
        //! The boost thread id
        boost::thread::id   boost_id;
        //! Thread function
        void operator () ();
        //! Default constructor
        OF_ThreadClass();
        //! Destructor
        ~OF_ThreadClass();
        //! Connect this thread with the process data to evaluate 
        void setProcessData( DataToProcess *pd );
        //! return the thread id
        int getId() const { return this->thread_id; }
        //! post process the thread...
        void post_process();
};




// The core function with the execution point of the Thread class...
void  OF_ThreadClass::operator () (){

    while( this->running ){

        OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock();
        OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->lock();

        // PUT HERE OUR CODE...
        if( running == true ){
            if( my_data != NULL ){
                this->processData();
            }
            this->my_data->done = true;
        }
        std::cout << ">>>>>> Thread " << thread_id << " notified that evaluation terminated\n"; 

        OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->unlock();              
        OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->lock();
    }
    OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock();
}




// A class to perform multithread calculation...
class OF_SmartThreads{
    private:

    //! The number of threads to activate
    int     _n_threads;
    //! The polling time
    int     _polling_time;
    //! The thread pool...
    std::vector< OF_ThreadClass *> threadPool;

    //! The stack of the available threads 
    std::set< OF_ThreadClass *> *OF_AVAILABLE_THREADS;
    //! The set of the running threads
    std::set< OF_ThreadClass *> OF_RUNNING_THREADS;

    //! The set of the locked datasets
    std::set< LinkedDataSet* >  locked_data;
    //! The set of the available datasets
    std::set< LinkedDataSet* >  unlocked_data;
    //! The set of the datasets under processing
    std::set< LinkedDataSet* >  processing_data;

    //! The size of the progress bar
    int progBarDim;

    public:
    //! Constructor
    OF_SmartThreads();
    //! Destructor
    ~OF_SmartThreads();

    //! Initialize the SmartThreads
    int init_function( std::list< LinkedDataSet * > *dList, int n_max_threads);
    //! Initialize the SmartThreads
    int init_function( std::set< LinkedDataSet * > *dSet, int n_max_threads);

    //! Process all the cuncurrent threads..
    int process_data();
    //! Process all the cuncurrent threads..
    int process_data_polling( int polling_time );
    //! stop the process..
    int post_process();
};




// Initialization function...
int OF_SmartThreads::init_function( ... ){

    // in the main thread...

    // Fill the pool of thread mutex...
    for(int i = 0; i< _n_threads; i++ ){

        _tm = new OF_BMUTEX;
        _tm->mutex_type.assign( "A" );
        _tm->m_id = i;
        OF_AVAILABLE_THREADS_MUTEX.push_back( _tm );

        _tm = new OF_BMUTEX;
        _tm->mutex_type.assign( "R" );
        _tm->m_id = i;
        OF_RUNNING_THREADS_MUTEX.push_back( _tm );
    }

    // Create the threads...
    threadPool.resize( _n_threads );
    for(int i = 0; i< _n_threads; i++ ){

        // ...preventivally lock the resources...
        OF_RUNNING_THREADS_MUTEX[i]->lock();
        OF_AVAILABLE_THREADS_MUTEX[i]->unlock();

        // ..create the new thread...
        pc = new OF_ThreadClass;
        // insert the new thread in the list...
        threadPool.at( pc->getId() ) = pc;
        // set it as available...
        OF_AVAILABLE_THREADS->insert( pc );
    }
}



// Execution function...
void process_data_polling( int _polling_time ){
    while ( running ){
        if ( something_changed ){   

            //Print the status on the screen...
            ...
        }

        something_changed = false;

        // Poll the status of the processing data periodically      
        boost::this_thread::sleep(boost::posix_time::millisec( _polling_time ));

        // Are there some data ready to process?
        if( OF_UNLOCKED_DATASETS->size() > 0 ){

            // Take the first 
            pd = *OF_UNLOCKED_DATASETS->begin();

            // are there some threads available?
            if( OF_AVAILABLE_THREADS->size() != 0 ){

                //...lock and move the datasets linked to pd...
                ret = lock_data( pd, LOCK ); 

                std::cout << "\tNumber of available threads: " << OF_AVAILABLE_THREADS->size() << std::endl;

                // Take the available thread...
                pc = *OF_AVAILABLE_THREADS->begin();

                // ...link it the dataset to process...
                pc->setProcess( pd );

                OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->lock();

                OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->unlock();

                something_changed = true;

            } // available threads  
        } // unlock datasets



        // Find, unlock and remove finished datasets...
        pIter2  = OF_RUNNING_THREADS->begin();
        pEnd2   = OF_RUNNING_THREADS->end();
        while( pIter2 != pEnd2 ){
            pc = *pIter2++;
            pd = pc->getDataSet();

            if( pd->isDone() ){

                //...unlock and move the datasets linked to the current dataset...
                ret_move = lock_data( pd, RELEASE_LOCK ); 

                //...remove the data from the active set
                ret_remove = OF_ACTIVE_DATASETS->erase( pd );

                // make the threads available
                moveThreads( pc, _RUNNING_, _AVAILABLE_ );

                something_changed = true;
            }
        }



        pIter2  = OF_AVAILABLE_THREADS->begin();
        pEnd2   = OF_AVAILABLE_THREADS->end();
        while( pIter2 != pEnd2 ){
            pc = *pIter2++;

            bool obtained = OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->try_lock();
            if( obtained ){
                std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " obtained running mutex..." << std::endl;
            }
            else{
                std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " failed to obtain running mutex..." << std::endl;
            }
            OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->unlock();
            std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " released available mutex..." << std::endl;
        }

        if(  ( OF_LOCKED_DATASETS->size() + OF_UNLOCKED_DATASETS->size() + OF_ACTIVE_DATASETS->size() ) > 0 ){
            running = true;
        }
        else{
            running = false;
        }

    } // end running...
}



// The main function...
int main( int argc, char* argv[]) {

    init_function( &data, INT_MAX );

    process_data_polling( 100 );

    lc.post_process();

    return 0;
}

在 Linux 和 OSX 上使用 boost 1.53 编译时,所有系统都能完美运行。使用的线程数为 2。日志的摘录如下所示。请注意从正确线程发出的互斥日志...

---> LOG FROM OSX ...

 ---------------------------------
 Number of data: 2
Data: 0, links: 
Data: 1, links: 

---> OF_SmartThreads::init_function --
 ------------------------------------
 --> 8 processors/cores detected.
 --> n_max_threads = 2
 ------------------------------------
                Mutex R0 locking from thread master 
                Mutex R0 locked from thread master 
                Mutex R0 try locking from thread master 
            OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
                Mutex A0 unlocking from thread master 
                Mutex A0 unlocked from thread master 
 New thread 0 created 
                Mutex R1 locking from thread master 
                Mutex R1 locked from thread master 
                Mutex R1 try locking from thread master 
            OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
                Mutex A1 unlocking from thread master 
                Mutex A1 unlocked from thread master 
 New thread 1 created 
 ---------------------------------
Available threads: 2
Unlocked datasets: 2


---> OF_SmartThreads::process_data_function

                Mutex A1 unlocking from thread1
                Mutex A1 unlocked from thread1
>>>>>> Thread 1 released available mutex...
                Mutex R1 locking from thread1
                Mutex A0 unlocking from thread0
                Mutex A0 unlocked from thread0
>>>>>> Thread 0 released available mutex...
                Mutex R0 locking from thread0

 UNLOCKED DATASETS : 0 1 
 LOCKED DATASETS   : 
 ACTIVE DATASETS   : 
 RUNNING THREADS   : 
    OF_SMART_THREADS: THREADS AVAILABLE
    Number of available threads: 2
    OF_SMART_THREADS: take the thread 0
    OF_SMART_THREADS: Thread master try to lock available mutex... 0
                Mutex A0 locking from thread master 
                Mutex A0 locked from thread master 
    OF_SMART_THREADS: Thread obtained available mutex... 0
    OF_SMART_THREADS: Thread try to unlock running mutex... 0
                Mutex R0 unlocking from thread master 
                Mutex R0 unlocked from thread master 
    OF_SMART_THREADS: Thread released running mutex... 0

            OF_SMART_THREADS: PREPARE AVAILABLE THREADS
                Mutex R1 try locking from thread master 
            OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
                Mutex A1 unlocking from thread master 
                Mutex A1 unlocked from thread master 
            OF_SMART_THREADS: Thread 1 released available mutex...

 UNLOCKED DATASETS : 1 
 LOCKED DATASETS   : 
 ACTIVE DATASETS   : 0 
 RUNNING THREADS   : 0->0 
                Mutex R0 locked from thread0
>>>>>> Thread 0 obtained running mutex...
>>>>>> Thread 0 is going to process the dataset 0
>>>>>> Thread 0 terminated to process the dataset 0
>>>>>> Thread 0 notified that evaluation terminated
                Mutex R0 unlocking from thread0
                Mutex R0 unlocked from thread0
>>>>>> Thread 0 released running mutex...
                Mutex A0 locking from thread0
    OF_SMART_THREADS: THREADS AVAILABLE
    Number of available threads: 1
    OF_SMART_THREADS: take the thread 1
    OF_SMART_THREADS: Thread master try to lock available mutex... 1
                Mutex A1 locking from thread master 
                Mutex A1 locked from thread master 
    OF_SMART_THREADS: Thread obtained available mutex... 1
    OF_SMART_THREADS: Thread try to unlock running mutex... 1
                Mutex R1 unlocking from thread master 
                Mutex R1 unlocked from thread master 
    OF_SMART_THREADS: Thread released running mutex... 1

        OF_SMART_THREADS: CHECK THREADS DONE
        ------------> DATASETS 0 done...
        ------------> DATASETS 0 removed from the active set. 

            OF_SMART_THREADS: PREPARE AVAILABLE THREADS
                Mutex R0 try locking from thread master 
                Mutex R0 try locked from thread master 
            OF_SMART_THREADS: Thread 0 obtained running mutex...
                Mutex R1 locked from thread1
                Mutex A0 unlocking from thread master 
>>>>>> Thread 1 obtained running mutex...
                Mutex A0 unlocked from thread master 
>>>>>> Thread 1 is going to process the dataset 1
                Mutex A0 locked from thread0
            OF_SMART_THREADS: Thread 0 released available mutex...
>>>>>> Thread 0 obtained available mutex...

 UNLOCKED DATASETS : 
 LOCKED DATASETS   : 
 ACTIVE DATASETS   : 1 
 RUNNING THREADS   : 1->1 
>>>>>> Thread 1 terminated to process the dataset 1
                Mutex A0 unlocking from thread0
>>>>>> Thread 1 notified that evaluation terminated
                Mutex A0 unlocked from thread0
                Mutex R1 unlocking from thread1
>>>>>> Thread 0 released available mutex...
                Mutex R1 unlocked from thread1
                Mutex R0 locking from thread0
>>>>>> Thread 1 released running mutex...
                Mutex A1 locking from thread1

        OF_SMART_THREADS: CHECK THREADS DONE
        ------------> DATASETS 1 done...
        ------------> DATASETS 1 removed from the active set. 

            OF_SMART_THREADS: PREPARE AVAILABLE THREADS
                Mutex R0 try locking from thread master 
            OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
                Mutex A0 unlocking from thread master 
                Mutex A0 unlocked from thread master 
            OF_SMART_THREADS: Thread 0 released available mutex...
                Mutex R1 try locking from thread master 
                Mutex R1 try locked from thread master 
            OF_SMART_THREADS: Thread 1 obtained running mutex...
                Mutex A1 unlocking from thread master 
                Mutex A1 unlocked from thread master 
            OF_SMART_THREADS: Thread 1 released available mutex...

OF_SMART_THREADS: ALL THE DATASETS HAS BEEN SUCCESFULLY PROCESSED...


                Mutex A1 locked from thread1
                Mutex R0 unlocking from thread master 
>>>>>> Thread 1 obtained available mutex...
                Mutex R0 unlocked from thread master 
                Mutex R0 locked from thread0
                Mutex A1 unlocking from thread1
>>>>>> Thread 0 obtained running mutex...
                Mutex A1 unlocked from thread1
>>>>>> Thread 0 notified that evaluation terminated
>>>>>> Thread 1 released available mutex...
                Mutex R0 unlocking from thread0
                Mutex R1 locking from thread1
                Mutex R0 unlocked from thread0
>>>>>> Thread 0 released running mutex...
                Mutex A0 locking from thread0
                Mutex A0 locked from thread0
>>>>>> Thread 0 obtained available mutex...
                Mutex A0 unlocking from thread0
                Mutex A0 unlocked from thread0
>>>>>> Thread 0 is terminating...
                Mutex R1 unlocking from thread master 
                Mutex R1 unlocked from thread master 
                Mutex R1 locked from thread1
>>>>>> Thread 1 obtained running mutex...
>>>>>> Thread 1 notified that evaluation terminated
                Mutex R1 unlocking from thread1
                Mutex R1 unlocked from thread1
>>>>>> Thread 1 released running mutex...
                Mutex A1 locking from thread1
                Mutex A1 locked from thread1
>>>>>> Thread 1 obtained available mutex...
                Mutex A1 unlocking from thread1
                Mutex A1 unlocked from thread1
>>>>>> Thread 1 is terminating...

在使用 Visual Studio 64 位和 Mingw 32 位的 Windows 7 上编译系统时会出现问题。从日志中可以看出,一开始就有死锁。这在我们看来很奇怪,不能用来自不同线程的互斥日志来解释。关于如何调试此问题的一些建议?

---> LOG FROM WINDOWS 7...


 ---------------------------------
 Number of data: 2
Data: 0, links:
Data: 1, links:

-———&gt; OF_SmartThreads::init_function --
 ------------------------------------
 --> 4 processors/cores detected.
 --> n_max_threads = 2
 ------------------------------------
                                Mutex R0 locking from thread master
                                Mutex R0 locked from thread master
                                Mutex R0 try locking from thread master
                        OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
                                Mutex A0 unlocking from thread master
                                Mutex A0 unlocked from thread master
 New thread 0 created
                                Mutex A0 unlocking from thread0
                                Mutex A0 unlocked from thread0
                                Mutex R1 locking from thread master
                                Mutex R1 locked from thread master
>>>>>> Thread 0 released available mutex...
                                Mutex R0 locking from thread0
                                Mutex R1 try locking from thread master
                        OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
                                Mutex A1 unlocking from thread master
                                Mutex A1 unlocked from thread master
 New thread 1 created
                                Mutex A1 unlocking from thread1
                                Mutex A1 unlocked from thread1
 ---------------------------------
Available threads: 2
>>>>>> Thread 1 released available mutex...
                                Mutex R1 locking from thread1
Unlocked datasets: 2

---> OF_SmartThreads::process_data_function


 UNLOCKED DATASETS : 0 1
 LOCKED DATASETS   :
 ACTIVE DATASETS   :
 RUNNING THREADS   :
        OF_SMART_THREADS: THREADS AVAILABLE
        Number of available threads: 2
        OF_SMART_THREADS: take the thread 0
        OF_SMART_THREADS: Thread master try to lock available mutex... 0
                                Mutex A0 locking from thread master
                                Mutex A0 locked from thread master
        OF_SMART_THREADS: Thread obtained available mutex... 0
        OF_SMART_THREADS: Thread try to unlock running mutex... 0
                                Mutex R0 unlocking from thread master
                                Mutex R0 unlocked from thread master
                                Mutex R0 locked from thread0
        OF_SMART_THREADS: Thread released running mutex... 0
>>>>>> Thread 0 obtained running mutex...
>>>>>> Thread 0 is going to process the dataset 0
 Process Data: delay 41

                        OF_SMART_THREADS: PREPARE AVAILABLE THREADS
>>>>>> Thread 0 terminated to process the dataset 0
>>>>>> Thread 0 notified that evaluation terminated
                                Mutex R1 try locking from thread master
                        OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
                                Mutex A1 unlocking from thread master
                                Mutex A1 unlocked from thread master
                                Mutex R0 unlocking from thread0
                                Mutex R0 unlocked from thread0
                        OF_SMART_THREADS: Thread 1 released available mutex...

 UNLOCKED DATASETS : 1
 LOCKED DATASETS   :
 ACTIVE DATASETS   : 0*
 RUNNING THREADS   : 0->0
>>>>>> Thread 0 released running mutex...
                                Mutex A0 locking from thread0
        OF_SMART_THREADS: THREADS AVAILABLE
        Number of available threads: 1
        OF_SMART_THREADS: take the thread 1
        OF_SMART_THREADS: Thread master try to lock available mutex... 1
                                Mutex A1 locking from thread master

存在死锁,线程主控无法锁定互斥锁 A1,但从日志中可以看出,之前没有其他线程锁定该互斥锁。有关如何调试此问题的一些建议?

问候

4

1 回答 1

1

将锁定监控添加到您OF_bmutexbool locked. 您不应该解锁未锁定的互斥锁,或锁定锁定的互斥锁 - 所以放置assert。似乎您init_function没有OF_AVAILABLE_THREADS_MUTEX[i]->unlock();事先锁定。

Boost BasicLockable 概念

m.unlock();

要求:当前线程拥有 m

所以看起来你违反了unlock()先决条件。这可以在您的日志中看到:

Mutex A0 unlocking from thread master 
Mutex A0 unlocked from thread master 
于 2013-10-28T12:12:35.813 回答