我们正在开发一个项目来生成一个通用线程类,该类允许我们处理一组相互关联的数据。基本思想是在不同的线程中仅评估未连接且可以同时处理的数据集。我们开发了一个基于 boost::thread 的 ThreadClass 和一个基于 boost::mutex 的 OF_bmutex 类来执行日志操作。
代码方案在链接的pdf(http://cdm.unimore.it/dep/test.pd)中,而主要类的骨架如下......
// encapsulate boost::mutex to log...
class OF_bmutex{
public:
std::string mutex_type;
int m_id;
boost::mutex m;
void lock(){
std::cout << "Mutex " << mutex_type << m_id << " locking from " << boost::this_thread::get_id() << std::endl;
m.lock();
std::cout << "Mutex " << mutex_type << m_id << " locked from " << boost::this_thread::get_id() << std::endl;
}
void unlock(){
std::cout << "Mutex " << mutex_type << m_id << " unlocking from " << boost::this_thread::get_id() << std::endl;
m.unlock();
std::cout << "Mutex " << mutex_type << m_id << " unlocked from " << boost::this_thread::get_id() << std::endl;
}
bool try_lock(){
std::cout << "Mutex " << mutex_type << m_id << " try locking from " << boost::this_thread::get_id() << std::endl;
bool ret = m.try_lock();
if( ret ){
std::cout << "Mutex " << mutex_type << m_id << " try locked from " << boost::this_thread::get_id() << std::endl;
}
return(ret);
}
};
// My thread class
class OF_ThreadClass {
private:
//! running variable
bool running;
//! The thread executing this process...
boost::thread *m_thread;
//! The data to process...
LinkedDataSet *my_data;
//! The id of this thread
int thread_id;
//! Process the data...
virtual int processData();
public:
//! The boost thread id
boost::thread::id boost_id;
//! Thread function
void operator () ();
//! Default constructor
OF_ThreadClass();
//! Destructor
~OF_ThreadClass();
//! Connect this thread with the process data to evaluate
void setProcessData( DataToProcess *pd );
//! return the thread id
int getId() const { return this->thread_id; }
//! post process the thread...
void post_process();
};
// The core function with the execution point of the Thread class...
void OF_ThreadClass::operator () (){
while( this->running ){
OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock();
OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->lock();
// PUT HERE OUR CODE...
if( running == true ){
if( my_data != NULL ){
this->processData();
}
this->my_data->done = true;
}
std::cout << ">>>>>> Thread " << thread_id << " notified that evaluation terminated\n";
OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->unlock();
OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->lock();
}
OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock();
}
// A class to perform multithread calculation...
class OF_SmartThreads{
private:
//! The number of threads to activate
int _n_threads;
//! The polling time
int _polling_time;
//! The thread pool...
std::vector< OF_ThreadClass *> threadPool;
//! The stack of the available threads
std::set< OF_ThreadClass *> *OF_AVAILABLE_THREADS;
//! The set of the running threads
std::set< OF_ThreadClass *> OF_RUNNING_THREADS;
//! The set of the locked datasets
std::set< LinkedDataSet* > locked_data;
//! The set of the available datasets
std::set< LinkedDataSet* > unlocked_data;
//! The set of the datasets under processing
std::set< LinkedDataSet* > processing_data;
//! The size of the progress bar
int progBarDim;
public:
//! Constructor
OF_SmartThreads();
//! Destructor
~OF_SmartThreads();
//! Initialize the SmartThreads
int init_function( std::list< LinkedDataSet * > *dList, int n_max_threads);
//! Initialize the SmartThreads
int init_function( std::set< LinkedDataSet * > *dSet, int n_max_threads);
//! Process all the cuncurrent threads..
int process_data();
//! Process all the cuncurrent threads..
int process_data_polling( int polling_time );
//! stop the process..
int post_process();
};
// Initialization function...
int OF_SmartThreads::init_function( ... ){
// in the main thread...
// Fill the pool of thread mutex...
for(int i = 0; i< _n_threads; i++ ){
_tm = new OF_BMUTEX;
_tm->mutex_type.assign( "A" );
_tm->m_id = i;
OF_AVAILABLE_THREADS_MUTEX.push_back( _tm );
_tm = new OF_BMUTEX;
_tm->mutex_type.assign( "R" );
_tm->m_id = i;
OF_RUNNING_THREADS_MUTEX.push_back( _tm );
}
// Create the threads...
threadPool.resize( _n_threads );
for(int i = 0; i< _n_threads; i++ ){
// ...preventivally lock the resources...
OF_RUNNING_THREADS_MUTEX[i]->lock();
OF_AVAILABLE_THREADS_MUTEX[i]->unlock();
// ..create the new thread...
pc = new OF_ThreadClass;
// insert the new thread in the list...
threadPool.at( pc->getId() ) = pc;
// set it as available...
OF_AVAILABLE_THREADS->insert( pc );
}
}
// Execution function...
void process_data_polling( int _polling_time ){
while ( running ){
if ( something_changed ){
//Print the status on the screen...
...
}
something_changed = false;
// Poll the status of the processing data periodically
boost::this_thread::sleep(boost::posix_time::millisec( _polling_time ));
// Are there some data ready to process?
if( OF_UNLOCKED_DATASETS->size() > 0 ){
// Take the first
pd = *OF_UNLOCKED_DATASETS->begin();
// are there some threads available?
if( OF_AVAILABLE_THREADS->size() != 0 ){
//...lock and move the datasets linked to pd...
ret = lock_data( pd, LOCK );
std::cout << "\tNumber of available threads: " << OF_AVAILABLE_THREADS->size() << std::endl;
// Take the available thread...
pc = *OF_AVAILABLE_THREADS->begin();
// ...link it the dataset to process...
pc->setProcess( pd );
OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->lock();
OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->unlock();
something_changed = true;
} // available threads
} // unlock datasets
// Find, unlock and remove finished datasets...
pIter2 = OF_RUNNING_THREADS->begin();
pEnd2 = OF_RUNNING_THREADS->end();
while( pIter2 != pEnd2 ){
pc = *pIter2++;
pd = pc->getDataSet();
if( pd->isDone() ){
//...unlock and move the datasets linked to the current dataset...
ret_move = lock_data( pd, RELEASE_LOCK );
//...remove the data from the active set
ret_remove = OF_ACTIVE_DATASETS->erase( pd );
// make the threads available
moveThreads( pc, _RUNNING_, _AVAILABLE_ );
something_changed = true;
}
}
pIter2 = OF_AVAILABLE_THREADS->begin();
pEnd2 = OF_AVAILABLE_THREADS->end();
while( pIter2 != pEnd2 ){
pc = *pIter2++;
bool obtained = OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->try_lock();
if( obtained ){
std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " obtained running mutex..." << std::endl;
}
else{
std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " failed to obtain running mutex..." << std::endl;
}
OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->unlock();
std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " released available mutex..." << std::endl;
}
if( ( OF_LOCKED_DATASETS->size() + OF_UNLOCKED_DATASETS->size() + OF_ACTIVE_DATASETS->size() ) > 0 ){
running = true;
}
else{
running = false;
}
} // end running...
}
// The main function...
int main( int argc, char* argv[]) {
init_function( &data, INT_MAX );
process_data_polling( 100 );
lc.post_process();
return 0;
}
在 Linux 和 OSX 上使用 boost 1.53 编译时,所有系统都能完美运行。使用的线程数为 2。日志的摘录如下所示。请注意从正确线程发出的互斥日志...
---> LOG FROM OSX ...
---------------------------------
Number of data: 2
Data: 0, links:
Data: 1, links:
---> OF_SmartThreads::init_function --
------------------------------------
--> 8 processors/cores detected.
--> n_max_threads = 2
------------------------------------
Mutex R0 locking from thread master
Mutex R0 locked from thread master
Mutex R0 try locking from thread master
OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
Mutex A0 unlocking from thread master
Mutex A0 unlocked from thread master
New thread 0 created
Mutex R1 locking from thread master
Mutex R1 locked from thread master
Mutex R1 try locking from thread master
OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
New thread 1 created
---------------------------------
Available threads: 2
Unlocked datasets: 2
---> OF_SmartThreads::process_data_function
Mutex A1 unlocking from thread1
Mutex A1 unlocked from thread1
>>>>>> Thread 1 released available mutex...
Mutex R1 locking from thread1
Mutex A0 unlocking from thread0
Mutex A0 unlocked from thread0
>>>>>> Thread 0 released available mutex...
Mutex R0 locking from thread0
UNLOCKED DATASETS : 0 1
LOCKED DATASETS :
ACTIVE DATASETS :
RUNNING THREADS :
OF_SMART_THREADS: THREADS AVAILABLE
Number of available threads: 2
OF_SMART_THREADS: take the thread 0
OF_SMART_THREADS: Thread master try to lock available mutex... 0
Mutex A0 locking from thread master
Mutex A0 locked from thread master
OF_SMART_THREADS: Thread obtained available mutex... 0
OF_SMART_THREADS: Thread try to unlock running mutex... 0
Mutex R0 unlocking from thread master
Mutex R0 unlocked from thread master
OF_SMART_THREADS: Thread released running mutex... 0
OF_SMART_THREADS: PREPARE AVAILABLE THREADS
Mutex R1 try locking from thread master
OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
OF_SMART_THREADS: Thread 1 released available mutex...
UNLOCKED DATASETS : 1
LOCKED DATASETS :
ACTIVE DATASETS : 0
RUNNING THREADS : 0->0
Mutex R0 locked from thread0
>>>>>> Thread 0 obtained running mutex...
>>>>>> Thread 0 is going to process the dataset 0
>>>>>> Thread 0 terminated to process the dataset 0
>>>>>> Thread 0 notified that evaluation terminated
Mutex R0 unlocking from thread0
Mutex R0 unlocked from thread0
>>>>>> Thread 0 released running mutex...
Mutex A0 locking from thread0
OF_SMART_THREADS: THREADS AVAILABLE
Number of available threads: 1
OF_SMART_THREADS: take the thread 1
OF_SMART_THREADS: Thread master try to lock available mutex... 1
Mutex A1 locking from thread master
Mutex A1 locked from thread master
OF_SMART_THREADS: Thread obtained available mutex... 1
OF_SMART_THREADS: Thread try to unlock running mutex... 1
Mutex R1 unlocking from thread master
Mutex R1 unlocked from thread master
OF_SMART_THREADS: Thread released running mutex... 1
OF_SMART_THREADS: CHECK THREADS DONE
------------> DATASETS 0 done...
------------> DATASETS 0 removed from the active set.
OF_SMART_THREADS: PREPARE AVAILABLE THREADS
Mutex R0 try locking from thread master
Mutex R0 try locked from thread master
OF_SMART_THREADS: Thread 0 obtained running mutex...
Mutex R1 locked from thread1
Mutex A0 unlocking from thread master
>>>>>> Thread 1 obtained running mutex...
Mutex A0 unlocked from thread master
>>>>>> Thread 1 is going to process the dataset 1
Mutex A0 locked from thread0
OF_SMART_THREADS: Thread 0 released available mutex...
>>>>>> Thread 0 obtained available mutex...
UNLOCKED DATASETS :
LOCKED DATASETS :
ACTIVE DATASETS : 1
RUNNING THREADS : 1->1
>>>>>> Thread 1 terminated to process the dataset 1
Mutex A0 unlocking from thread0
>>>>>> Thread 1 notified that evaluation terminated
Mutex A0 unlocked from thread0
Mutex R1 unlocking from thread1
>>>>>> Thread 0 released available mutex...
Mutex R1 unlocked from thread1
Mutex R0 locking from thread0
>>>>>> Thread 1 released running mutex...
Mutex A1 locking from thread1
OF_SMART_THREADS: CHECK THREADS DONE
------------> DATASETS 1 done...
------------> DATASETS 1 removed from the active set.
OF_SMART_THREADS: PREPARE AVAILABLE THREADS
Mutex R0 try locking from thread master
OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
Mutex A0 unlocking from thread master
Mutex A0 unlocked from thread master
OF_SMART_THREADS: Thread 0 released available mutex...
Mutex R1 try locking from thread master
Mutex R1 try locked from thread master
OF_SMART_THREADS: Thread 1 obtained running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
OF_SMART_THREADS: Thread 1 released available mutex...
OF_SMART_THREADS: ALL THE DATASETS HAS BEEN SUCCESFULLY PROCESSED...
Mutex A1 locked from thread1
Mutex R0 unlocking from thread master
>>>>>> Thread 1 obtained available mutex...
Mutex R0 unlocked from thread master
Mutex R0 locked from thread0
Mutex A1 unlocking from thread1
>>>>>> Thread 0 obtained running mutex...
Mutex A1 unlocked from thread1
>>>>>> Thread 0 notified that evaluation terminated
>>>>>> Thread 1 released available mutex...
Mutex R0 unlocking from thread0
Mutex R1 locking from thread1
Mutex R0 unlocked from thread0
>>>>>> Thread 0 released running mutex...
Mutex A0 locking from thread0
Mutex A0 locked from thread0
>>>>>> Thread 0 obtained available mutex...
Mutex A0 unlocking from thread0
Mutex A0 unlocked from thread0
>>>>>> Thread 0 is terminating...
Mutex R1 unlocking from thread master
Mutex R1 unlocked from thread master
Mutex R1 locked from thread1
>>>>>> Thread 1 obtained running mutex...
>>>>>> Thread 1 notified that evaluation terminated
Mutex R1 unlocking from thread1
Mutex R1 unlocked from thread1
>>>>>> Thread 1 released running mutex...
Mutex A1 locking from thread1
Mutex A1 locked from thread1
>>>>>> Thread 1 obtained available mutex...
Mutex A1 unlocking from thread1
Mutex A1 unlocked from thread1
>>>>>> Thread 1 is terminating...
在使用 Visual Studio 64 位和 Mingw 32 位的 Windows 7 上编译系统时会出现问题。从日志中可以看出,一开始就有死锁。这在我们看来很奇怪,不能用来自不同线程的互斥日志来解释。关于如何调试此问题的一些建议?
---> LOG FROM WINDOWS 7...
---------------------------------
Number of data: 2
Data: 0, links:
Data: 1, links:
-———> OF_SmartThreads::init_function --
------------------------------------
--> 4 processors/cores detected.
--> n_max_threads = 2
------------------------------------
Mutex R0 locking from thread master
Mutex R0 locked from thread master
Mutex R0 try locking from thread master
OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
Mutex A0 unlocking from thread master
Mutex A0 unlocked from thread master
New thread 0 created
Mutex A0 unlocking from thread0
Mutex A0 unlocked from thread0
Mutex R1 locking from thread master
Mutex R1 locked from thread master
>>>>>> Thread 0 released available mutex...
Mutex R0 locking from thread0
Mutex R1 try locking from thread master
OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
New thread 1 created
Mutex A1 unlocking from thread1
Mutex A1 unlocked from thread1
---------------------------------
Available threads: 2
>>>>>> Thread 1 released available mutex...
Mutex R1 locking from thread1
Unlocked datasets: 2
---> OF_SmartThreads::process_data_function
UNLOCKED DATASETS : 0 1
LOCKED DATASETS :
ACTIVE DATASETS :
RUNNING THREADS :
OF_SMART_THREADS: THREADS AVAILABLE
Number of available threads: 2
OF_SMART_THREADS: take the thread 0
OF_SMART_THREADS: Thread master try to lock available mutex... 0
Mutex A0 locking from thread master
Mutex A0 locked from thread master
OF_SMART_THREADS: Thread obtained available mutex... 0
OF_SMART_THREADS: Thread try to unlock running mutex... 0
Mutex R0 unlocking from thread master
Mutex R0 unlocked from thread master
Mutex R0 locked from thread0
OF_SMART_THREADS: Thread released running mutex... 0
>>>>>> Thread 0 obtained running mutex...
>>>>>> Thread 0 is going to process the dataset 0
Process Data: delay 41
OF_SMART_THREADS: PREPARE AVAILABLE THREADS
>>>>>> Thread 0 terminated to process the dataset 0
>>>>>> Thread 0 notified that evaluation terminated
Mutex R1 try locking from thread master
OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
Mutex R0 unlocking from thread0
Mutex R0 unlocked from thread0
OF_SMART_THREADS: Thread 1 released available mutex...
UNLOCKED DATASETS : 1
LOCKED DATASETS :
ACTIVE DATASETS : 0*
RUNNING THREADS : 0->0
>>>>>> Thread 0 released running mutex...
Mutex A0 locking from thread0
OF_SMART_THREADS: THREADS AVAILABLE
Number of available threads: 1
OF_SMART_THREADS: take the thread 1
OF_SMART_THREADS: Thread master try to lock available mutex... 1
Mutex A1 locking from thread master
存在死锁,线程主控无法锁定互斥锁 A1,但从日志中可以看出,之前没有其他线程锁定该互斥锁。有关如何调试此问题的一些建议?
问候