0

我在从主线程终止工作线程时遇到问题。到目前为止,我尝试的每种方法都会导致竞争条件或死锁。

工作线程存储在一个名为 ThreadPool 的类中的内部类中,ThreadPool 使用 unique_ptr 维护这些 WorkerThread 的向量。

这是我的 ThreadPool 的标题:

class ThreadPool
{
public:
typedef void (*pFunc)(const wpath&, const Args&, Global::mFile_t&, std::mutex&, std::mutex&);       // function to point to
private:

    class WorkerThread
    {
    private:
        ThreadPool* const _thisPool;        // reference enclosing class

        // pointers to arguments
        wpath _pPath;               // member argument that will be modifyable to running thread
        Args * _pArgs;
        Global::mFile_t * _pMap;

        // flags for thread management
        bool _terminate;                    // terminate thread
        bool _busy;                         // is thread busy?
        bool _isRunning;

        // thread management members

        std::mutex              _threadMtx;
        std::condition_variable _threadCond;
        std::thread             _thisThread;

        // exception ptr
        std::exception_ptr _ex;

        // private copy constructor
        WorkerThread(const WorkerThread&): _thisPool(nullptr) {}
    public:
        WorkerThread(ThreadPool&, Args&, Global::mFile_t&);
        ~WorkerThread();

        void setPath(const wpath);          // sets a new task
        void terminate();                   // calls terminate on thread
        bool busy() const;                  // returns whether thread is busy doing task
        bool isRunning() const;             // returns whether thread is still running
        void join();                        // thread join wrapper
        std::exception_ptr exception() const;

        // actual worker thread running tasks
        void thisWorkerThread();
    };

    // thread specific information
    DWORD _numProcs;                        // number of processors on system
    unsigned _numThreads;                   // number of viable threads
    std::vector<std::unique_ptr<WorkerThread>> _vThreads;   // stores thread pointers - workaround for no move constructor in WorkerThread
    pFunc _task;                            // the task threads will call

    // synchronization members
    unsigned _barrierLimit;                 // limit before barrier goes down
    std::mutex _barrierMtx;                 // mutex for barrier
    std::condition_variable _barrierCond;   // condition for barrier
    std::mutex _coutMtx;

public:
    // argument mutex
    std::mutex matchesMap_mtx;
    std::mutex coutMatch_mtx;

    ThreadPool(pFunc f);

    // wake a thread and pass it a new parameter to work on
    void callThread(const wpath&);

    // barrier synchronization
    void synchronizeStartingThreads();

    // starts and synchronizes all threads in a sleep state
    void startThreads(Args&, Global::mFile_t&);

    // terminate threads
    void terminateThreads();

private:
};

到目前为止,我遇到的真正问题是,从主线程调用 terminateThreads() 会导致死锁或竞争条件。

当我将 _terminate 标志设置为 true 时,主线程可能已经退出作用域并在线程有机会唤醒和终止之前销毁所有互斥锁。事实上,我已经多次遇到这种崩溃(控制台窗口显示:忙时互斥体被破坏)

如果我在 notify_all() 线程之后添加 thread.join(),则线程有可能在连接发生之前终止,从而导致无限死锁,因为加入已终止的线程会无限期地挂起程序。

如果我分离 - 与上述相同的问题,但会导致程序崩溃

如果我改为使用 while(WorkerThread.isRunning()) Sleep(0); 程序可能会崩溃,因为主线程可能在 WorkerThread 到达最后一个右括号之前退出。

在所有工作线程安全终止之前,我不确定还可以做什么来停止主线程。此外,即使在 thread 和 main 中使用 try-catch,也不会捕获任何异常。(我尝试过的一切都会导致程序崩溃)

在工作线程完成之前,我该怎么做才能停止主线程?

以下是主要功能的实现:

终止单个工作线程

void ThreadPool::WorkerThread::terminate()
{
    _terminate = true;
    _threadCond.notify_all();
    _thisThread.join();
}

实际的线程循环

void ThreadPool::WorkerThread::thisWorkerThread()
{
    _thisPool->synchronizeStartingThreads();

    try
    {
        while (!_terminate)
        {
            {
                _thisPool->_coutMtx.lock();
                std::cout << std::this_thread::get_id() << " Sleeping..." << std::endl;
                _thisPool->_coutMtx.unlock();
                _busy = false;
                std::unique_lock<std::mutex> lock(_threadMtx);
                _threadCond.wait(lock);
            }
            _thisPool->_coutMtx.lock();
            std::cout << std::this_thread::get_id() << " Awake..." << std::endl;
            _thisPool->_coutMtx.unlock();
            if(_terminate)
                break;

            _thisPool->_task(_pPath, *_pArgs, *_pMap, _thisPool->coutMatch_mtx, _thisPool->matchesMap_mtx);

            _thisPool->_coutMtx.lock();
            std::cout << std::this_thread::get_id() << " Finished Task..." << std::endl;
            _thisPool->_coutMtx.unlock();

        }
        _thisPool->_coutMtx.lock();
        std::cout << std::this_thread::get_id() << " Terminating" << std::endl;
        _thisPool->_coutMtx.unlock();   
    }
    catch (const std::exception&)
    {
        _ex = std::current_exception();
    }
    _isRunning = false;
}

终止所有工作线程

void ThreadPool::terminateThreads()
{
    for (std::vector<std::unique_ptr<WorkerThread>>::iterator it = _vThreads.begin(); it != _vThreads.end(); ++it)
    {
        it->get()->terminate();
        //it->get()->_thisThread.detach();

        // if thread threw an exception, rethrow it in main
        if (it->get()->exception() != nullptr)
            std::rethrow_exception(it->get()->exception());
    }
}

最后,调用线程池的函数(扫描函数在 main 上运行)

// scans a path recursively for all files of selected extension type, calls thread to parse file
unsigned int Functions::Scan(wpath path, const Args& args, ThreadPool& pool)
{
    wrecursive_directory_iterator d(path), e;
    unsigned int filesFound = 0;
    while ( d != e )
    {
        if (args.verbose())
            std::wcout << L"Grepping: " << d->path().string() << std::endl;

        for (Args::ext_T::const_iterator it = args.extension().cbegin(); it != args.extension().cend(); ++it)
        {
            if (extension(d->path()) == *it)
            {
                ++filesFound;
                pool.callThread(d->path());
            }
        }
        ++d;
    }

    std::cout << "Scan Function: Calling TerminateThreads() " << std::endl;
    pool.terminateThreads();
    std::cout << "Scan Function: Called TerminateThreads() " << std::endl;
    return filesFound;
}

我将再次重复这个问题:在工作线程完成之前,我能做些什么来停止主线程?

4

2 回答 2

1

我没有得到线程终止和加入的问题。

加入线程就是等待给定线程终止,所以这正是你想要做的。如果线程已经完成执行,join将立即返回。

因此,您只需在terminate调用期间加入每个线程,就像您在代码中所做的那样。

注意:目前,如果您刚刚终止的线程具有活动的exception_ptr. 这可能会导致未连接的线程。在处理这些异常时,您必须牢记这一点

更新:查看您的代码后,我看到了一个潜在的错误:std::condition_variable::wait()可以在发生虚假唤醒时返回。如果是这种情况,您将在上次工作的路径上再次工作,从而导致错误的结果。如果添加了新工作,您应该为新工作设置一个标志,并且该_threadCond.wait(lock)行应该在一个循环中检查标志和_terminate. 不过,不确定那是否会解决您的问题。

于 2013-06-17T06:24:21.947 回答
0

问题有两个方面:

synchronizeStartingThreads() 有时会阻塞 1 或 2 个线程,等待可以继续进行(while (some_condition) barrierCond.wait(lock) 中的问题。条件有时永远不会计算为真。删除 while 循环修复了这个问题阻塞问题。

第二个问题是工作线程有可能进入 _threadMtx,并且在它们进入 _threadCond.wait() 之前调用了 notify_all,因为已经调用了 notify,线程将永远等待。

IE。

{
    // terminate() is called
    std::unique_lock<std::mutex> lock(_threadMtx);
    // _threadCond.notify_all() is called here
    _busy = false;
    _threadCond.wait(lock);
    // thread is blocked forever
}

令人惊讶的是,将这个互斥锁锁定在 terminate() 中并没有阻止这种情况的发生。

这是通过向 _threadCond.wait() 添加 30 毫秒的超时来解决的

此外,在任务开始之前添加了一项检查,以确保不会再次处理相同的任务。

新代码现在如下所示:

thisWorkerThread

_threadCond.wait_for(lock, std::chrono::milliseconds(30));  // hold the lock a max of 30ms

// after the lock, and the termination check

if(_busy)
        {
            Global::mFile_t rMap = _thisPool->_task(_pPath, *_pArgs, _thisPool->coutMatch_mtx);
            _workerMap.element.insert(rMap.element.begin(), rMap.element.end());
        }
于 2013-06-17T10:18:01.157 回答