我在从主线程终止工作线程时遇到问题。到目前为止,我尝试的每种方法都会导致竞争条件或死锁。
工作线程存储在一个名为 ThreadPool 的类中的内部类中,ThreadPool 使用 unique_ptr 维护这些 WorkerThread 的向量。
这是我的 ThreadPool 的标题:
class ThreadPool
{
public:
typedef void (*pFunc)(const wpath&, const Args&, Global::mFile_t&, std::mutex&, std::mutex&); // function to point to
private:
class WorkerThread
{
private:
ThreadPool* const _thisPool; // reference enclosing class
// pointers to arguments
wpath _pPath; // member argument that will be modifyable to running thread
Args * _pArgs;
Global::mFile_t * _pMap;
// flags for thread management
bool _terminate; // terminate thread
bool _busy; // is thread busy?
bool _isRunning;
// thread management members
std::mutex _threadMtx;
std::condition_variable _threadCond;
std::thread _thisThread;
// exception ptr
std::exception_ptr _ex;
// private copy constructor
WorkerThread(const WorkerThread&): _thisPool(nullptr) {}
public:
WorkerThread(ThreadPool&, Args&, Global::mFile_t&);
~WorkerThread();
void setPath(const wpath); // sets a new task
void terminate(); // calls terminate on thread
bool busy() const; // returns whether thread is busy doing task
bool isRunning() const; // returns whether thread is still running
void join(); // thread join wrapper
std::exception_ptr exception() const;
// actual worker thread running tasks
void thisWorkerThread();
};
// thread specific information
DWORD _numProcs; // number of processors on system
unsigned _numThreads; // number of viable threads
std::vector<std::unique_ptr<WorkerThread>> _vThreads; // stores thread pointers - workaround for no move constructor in WorkerThread
pFunc _task; // the task threads will call
// synchronization members
unsigned _barrierLimit; // limit before barrier goes down
std::mutex _barrierMtx; // mutex for barrier
std::condition_variable _barrierCond; // condition for barrier
std::mutex _coutMtx;
public:
// argument mutex
std::mutex matchesMap_mtx;
std::mutex coutMatch_mtx;
ThreadPool(pFunc f);
// wake a thread and pass it a new parameter to work on
void callThread(const wpath&);
// barrier synchronization
void synchronizeStartingThreads();
// starts and synchronizes all threads in a sleep state
void startThreads(Args&, Global::mFile_t&);
// terminate threads
void terminateThreads();
private:
};
到目前为止,我遇到的真正问题是,从主线程调用 terminateThreads() 会导致死锁或竞争条件。
当我将 _terminate 标志设置为 true 时,主线程可能已经退出作用域并在线程有机会唤醒和终止之前销毁所有互斥锁。事实上,我已经多次遇到这种崩溃(控制台窗口显示:忙时互斥体被破坏)
如果我在 notify_all() 线程之后添加 thread.join(),则线程有可能在连接发生之前终止,从而导致无限死锁,因为加入已终止的线程会无限期地挂起程序。
如果我分离 - 与上述相同的问题,但会导致程序崩溃
如果我改为使用 while(WorkerThread.isRunning()) Sleep(0); 程序可能会崩溃,因为主线程可能在 WorkerThread 到达最后一个右括号之前退出。
在所有工作线程安全终止之前,我不确定还可以做什么来停止主线程。此外,即使在 thread 和 main 中使用 try-catch,也不会捕获任何异常。(我尝试过的一切都会导致程序崩溃)
在工作线程完成之前,我该怎么做才能停止主线程?
以下是主要功能的实现:
终止单个工作线程
void ThreadPool::WorkerThread::terminate()
{
_terminate = true;
_threadCond.notify_all();
_thisThread.join();
}
实际的线程循环
void ThreadPool::WorkerThread::thisWorkerThread()
{
_thisPool->synchronizeStartingThreads();
try
{
while (!_terminate)
{
{
_thisPool->_coutMtx.lock();
std::cout << std::this_thread::get_id() << " Sleeping..." << std::endl;
_thisPool->_coutMtx.unlock();
_busy = false;
std::unique_lock<std::mutex> lock(_threadMtx);
_threadCond.wait(lock);
}
_thisPool->_coutMtx.lock();
std::cout << std::this_thread::get_id() << " Awake..." << std::endl;
_thisPool->_coutMtx.unlock();
if(_terminate)
break;
_thisPool->_task(_pPath, *_pArgs, *_pMap, _thisPool->coutMatch_mtx, _thisPool->matchesMap_mtx);
_thisPool->_coutMtx.lock();
std::cout << std::this_thread::get_id() << " Finished Task..." << std::endl;
_thisPool->_coutMtx.unlock();
}
_thisPool->_coutMtx.lock();
std::cout << std::this_thread::get_id() << " Terminating" << std::endl;
_thisPool->_coutMtx.unlock();
}
catch (const std::exception&)
{
_ex = std::current_exception();
}
_isRunning = false;
}
终止所有工作线程
void ThreadPool::terminateThreads()
{
for (std::vector<std::unique_ptr<WorkerThread>>::iterator it = _vThreads.begin(); it != _vThreads.end(); ++it)
{
it->get()->terminate();
//it->get()->_thisThread.detach();
// if thread threw an exception, rethrow it in main
if (it->get()->exception() != nullptr)
std::rethrow_exception(it->get()->exception());
}
}
最后,调用线程池的函数(扫描函数在 main 上运行)
// scans a path recursively for all files of selected extension type, calls thread to parse file
unsigned int Functions::Scan(wpath path, const Args& args, ThreadPool& pool)
{
wrecursive_directory_iterator d(path), e;
unsigned int filesFound = 0;
while ( d != e )
{
if (args.verbose())
std::wcout << L"Grepping: " << d->path().string() << std::endl;
for (Args::ext_T::const_iterator it = args.extension().cbegin(); it != args.extension().cend(); ++it)
{
if (extension(d->path()) == *it)
{
++filesFound;
pool.callThread(d->path());
}
}
++d;
}
std::cout << "Scan Function: Calling TerminateThreads() " << std::endl;
pool.terminateThreads();
std::cout << "Scan Function: Called TerminateThreads() " << std::endl;
return filesFound;
}
我将再次重复这个问题:在工作线程完成之前,我能做些什么来停止主线程?