21

当需要销毁该类的对象时,关闭由 C++ 类管理的 Boost 线程的最佳方法是什么?我有一个类,它在构造时创建并启动一个线程,并提供一个公共Wake()方法,该方法在需要做一些工作时唤醒线程。该Wake()方法使用 Boost mutex 和 Boost 条件变量来向线程发出信号;线程过程等待条件变量,然后完成工作并返回等待。

目前,我在类的析构函数中关闭了这个线程,使用布尔成员变量作为“运行”标志;我清除标志,然后在条件变量上调用 notify_one()。然后线程过程唤醒,注意到“运行”为假,然后返回。这是代码:

class Worker
{
public:
    Worker();
    ~Worker();
    void Wake();
private:
    Worker(Worker const& rhs);             // prevent copying
    Worker& operator=(Worker const& rhs);  // prevent assignment
    void ThreadProc();
    bool m_Running;
    boost::mutex               m_Mutex;
    boost::condition_variable  m_Condition;
    boost::scoped_ptr<boost::thread> m_pThread;
};

Worker::Worker()
    : m_Running(true)
    , m_Mutex()
    , m_Condition()
    , m_pThread()
{
    m_pThread.reset(new boost::thread(boost::bind(&Worker::ThreadProc, this)));
}

Worker::~Worker()
{
    m_Running = false;
    m_Condition.notify_one();
    m_pThread->join();
}

void Worker::Wake()
{
    boost::lock_guard<boost::mutex> lock(m_Mutex);
    m_Condition.notify_one();
}

void Worker::ThreadProc()
{
    for (;;)
    {
        boost::unique_lock<boost::mutex> lock(m_Mutex);
        m_Condition.wait(lock);
        if (! m_Running) break;
        // do some work here
    }
}

像这样关闭类的析构函数中的线程是个好主意,还是我应该提供一个公共方法让用户在对象被销毁之前执行此操作,当有更多的错误处理和/或强行销毁线程的可能性时如果线程过程未能干净或及时返回?

在其析构函数中清理我的对象的混乱是很有吸引力的,因为它不需要用户对细节的关注(抽象,欢呼!)但在我看来,如果我能保证承担全部责任,我应该只在析构函数中做事成功而彻底地清理东西,并且类外的代码有一天可能需要知道线程是否被干净地关闭的可能性很小。

此外,我正在使用的机制——写入一个线程堆栈上的对象中的成员变量并在另一个线程中读取该变量——是否安全且理智?

4

1 回答 1

55

在类被销毁时释放类创建的资源是一个好主意,即使其中一个资源是线程。如果资源是通过用户调用显式创建的,例如Worker::Start(),那么还应该有一个显式的方式来释放它,例如Worker::Stop(). 如果用户不调用和/或为用户提供一个实现RAII -idiomWorker::Stop()的作用域辅助类,在其构造函数和析构函数中调用,则在析构函数中执行清理也是一个好主意。但是,如果资源分配是隐式完成的,例如在构造函数中,那么资源的释放也应该是隐式的,而析构函数是该职责的主要候选者。Worker::Start()Worker::Stop()Worker


破坏

让我们检查一下Worker::~Worker()。一般规则是不要在析构函数中抛出异常。如果一个Worker对象在从另一个异常展开的堆栈上,并Worker::~Worker()抛出一个异常,那么std::terminate()将被调用,杀死应用程序。虽然Worker::~Worker()没有显式抛出异常,但重要的是要考虑到它正在调用的某些函数可能会抛出:

如果std::terminate()是所需的行为,则不需要更改。但是,如果std::terminate()不需要,则捕获boost::thread_interrupted并抑制它。

Worker::~Worker()
{
  m_Running = false;
  m_Condition.notify_one();
  try
  {
    m_pThread->join();
  }
  catch ( const boost::thread_interrupted& )
  {
    /* suppressed */ 
  }
}

并发

管理线程可能很棘手。定义函数的确切期望行为(Worker::Wake()如 )以及了解促进线程和同步的类型的行为非常重要。例如,boost::condition_variable::notify_one()如果没有线程被阻塞在boost::condition_variable::wait(). 让我们检查一下可能的并发路径Worker::Wake()

下面是对两种场景的并发性进行粗略的尝试:

  • 操作顺序从上到下发生。(即顶部的操作发生在底部的操作之前。
  • 并发操作写在同一行。
  • <>用于突出显示一个线程何时唤醒或解除阻塞另一个线程。例如A > B表示线程A正在解除阻塞线程B

场景:在被阻塞时Worker::Wake()调用。Worker::ThreadProc()m_Condition

其他主题 | 工人::ThreadProc
------------------------------------------------+-------------- ----------------------------
                                   | 锁(m_Mutex)
                                   | `-- m_Mutex.lock()
                                   | m_Condition::wait(锁)
                                   | |-- m_Mutex.unlock()
                                   | |-- 等待通知
工人::Wake() | |
|-- 锁( m_Mutex ) | |
| `-- m_Mutex.lock() | |
|-- m_Condition::notify_one() > |-- 从通知中唤醒
`-- ~lock() | `-- m_Mutex.lock() // 块
    `-- m_Mutex.unlock() > `-- // 获取锁
                                   | // 在这里做一些工作
                                   | ~lock() // for 循环范围的结束
                                   | `-- m_Mutex.unlock()

结果Worker::Wake()返回相当快,并Worker::ThreadProc运行。


场景:在未被阻塞时Worker::Wake()调用。Worker::ThreadProc()m_Condition

其他主题 | 工人::ThreadProc
------------------------------------------------+-------------- ----------------------------
                                   | 锁(m_Mutex)
                                   | `-- m_Mutex.lock()
                                   | m_Condition::wait(锁)
                                   | |-- m_Mutex.unlock()
Worker::Wake() > |-- 唤醒
                                   | `-- m_Mutex.lock()
工人::Wake() | // 在这里做一些工作
|-- 锁( m_Mutex ) | // 还在工作...
| |-- m_Mutex.lock() // 块 | // 希望我们不要阻塞系统调用
| | | // 还有更多工作...
| | | ~lock() // for 循环范围的结束
| |-- // 仍然被阻塞 < `-- m_Mutex.unlock()
| `-- // 获取锁 | lock( m_Mutex ) // 下一次“for”迭代。
|-- m_Condition::notify_one() | `-- m_Mutex.lock() // 阻塞
`-- ~lock() | |-- // 仍然被阻塞
    `-- m_Mutex.unlock() > `-- // 获取锁
                                   | m_Condition::wait(锁)    
                                   | |-- m_Mutex.unlock()
                                   | `-- 等待通知
                                   | `--仍在等待...

结果:Worker::Wake()像工作一样被阻止Worker::ThreadProc,但是没有操作,因为它m_Condition在没有人等待它时发送了一个通知。

这并不是特别危险Worker::Wake(),但它可能会导致问题Worker::~Worker()。如果在工作时Worker::~Worker()运行,则在加入线程时可能会无限期地阻塞,因为线程可能不会在收到通知时等待,并且仅在等待完成后才检查。Worker::ThreadProcWorker::~Worker()m_ConditionWorker::ThreadProcm_Runningm_Condition


努力寻找解决方案

在此示例中,让我们定义以下要求:

  • Worker::~Worker()不会导致std::terminate()被调用。
  • Worker::Wake()工作时不会阻塞Worker::ThreadProc
  • 如果在不工作Worker::Wake()的时候被调用Worker::ThreadProc,那么它会通知Worker::ThreadProc做工作。
  • 如果在工作Worker::Wake()时被调用Worker::ThreadProc,那么它将通知Worker::ThreadProc执行另一次工作迭代。
  • 多次调用Worker::Wake()while Worker::ThreadProcis doing work 将导致Worker::ThreadProc执行一次额外的工作迭代。

代码:

#include <boost/thread.hpp>
 
class Worker
{
public:
  Worker();
  ~Worker();
  void Wake();
private:
  Worker(Worker const& rhs);             // prevent copying
  Worker& operator=(Worker const& rhs);  // prevent assignment
  void ThreadProc();
 
  enum state { HAS_WORK, NO_WORK, SHUTDOWN };
  
  state                            m_State;
  boost::mutex                     m_Mutex;
  boost::condition_variable        m_Condition;
  boost::thread                    m_Thread;
};
 
Worker::Worker()
  : m_State(NO_WORK)
  , m_Mutex()
  , m_Condition()
  , m_Thread()
{
  m_Thread = boost::thread(&Worker::ThreadProc, this);
}
 
Worker::~Worker()
{
  // Create scope so that the mutex is only locked when changing state and
  // notifying the condition.  It would result in a deadlock if the lock was
  // still held by this function when trying to join the thread.
  {
    boost::lock_guard<boost::mutex> lock(m_Mutex);
    m_State = SHUTDOWN;
    m_Condition.notify_one();
  }
  try { m_Thread.join(); }
  catch ( const boost::thread_interrupted& ) { /* suppress */ };
}
 
void Worker::Wake()
{
  boost::lock_guard<boost::mutex> lock(m_Mutex);
  m_State = HAS_WORK;
  m_Condition.notify_one();
}
 
void Worker::ThreadProc()
{
  for (;;)
  {
    // Create scope to only lock the mutex when checking for the state.  Do
    // not continue to hold the mutex wile doing busy work.
    {
      boost::unique_lock<boost::mutex> lock(m_Mutex);
      // While there is no work (implies not shutting down), then wait on
      // the condition.
      while (NO_WORK == m_State)
      {
        m_Condition.wait(lock);
        // Will wake up from either Wake() or ~Worker() signaling the condition
        // variable.  At that point, m_State will either be HAS_WORK or
        // SHUTDOWN.
      }
      // On shutdown, break out of the for loop.
      if (SHUTDOWN == m_State) break;
      // Set state to indicate no work is queued.
      m_State = NO_WORK;
    }
 
    // do some work here
  }
}

注意:作为个人喜好,我选择不在boost::thread堆上分配,因此我不需要通过boost::scoped_ptr. boost::thread有一个默认构造函数将引用Not-a-Thread,并且它是move-assignable

于 2012-06-29T22:53:28.477 回答