1

I would like some feedback regarding the IService class listed below. From what I know, this type of class is related to the "active-object" pattern. Please excuse/correct if I use any related terminology incorrectly. Basically the idea is that the classes using this active object class need to provide a start and a stop method which control some event loop. This event loop could be implemented with a while loop or with boost asio etc.

This class is responsible for starting a new thread in a non-blocking manner so that events can be handled in/by the new thread. It must also handle all clean-up related code. I first tried an OO approach in which subclasses were responsible for overriding methods to control the event loop but the cleanup was messy: in the destructor calling the stop method resulted in a pure virtual function call in cases where the calling class had not manually called the stop method. The templated solution seems to be a lot cleaner:

template <typename T>
class IService : private boost::noncopyable
{
    typedef boost::shared_ptr<boost::thread> thread_ptr;
public:

  IService()
  {
  }

  ~IService()
  {
    /// try stop the service in case it's running
    stop();
  }

  void start()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);

    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      // already running
      return;
    }

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService::main, this)));

    // need to wait for thread to start: else if destructor is called before thread has started

    // Wait for condition to be signaled and then
    // try timed wait since the application could deadlock if the thread never starts?
    //if (m_startCondition.timed_wait(m_threadMutex, boost::posix_time::milliseconds(getServiceTimeoutMs())))
    //{
    //}
    m_startCondition.wait(m_threadMutex);

    // notify main to continue: it's blocked on the same condition var
    m_startCondition.notify_one();
  }

  void stop()
  {
    // trigger the stopping of the event loop
    m_serviceObject.stop();

    if (m_pServiceThread)
    {
      if (m_pServiceThread->joinable())
      {
        m_pServiceThread->join();
      }
      // the service is stopped so we can reset the thread
      m_pServiceThread.reset();
    }
  }

private:
  /// entry point of thread
  void main()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);
    // notify main thread that it can continue
    m_startCondition.notify_one();

    // Try Dummy wait to allow 1st thread to resume???
    m_startCondition.wait(m_threadMutex);

    // call template implementation of event loop
    m_serviceObject.start();
  }

  /// Service thread
  thread_ptr m_pServiceThread;
  /// Thread mutex
  mutable boost::mutex m_threadMutex;
  /// Condition for signaling start of thread
  boost::condition m_startCondition;

  /// T must satisfy the implicit service interface and provide a start and a stop method
  T m_serviceObject;
};

The class could be used as follows:

class TestObject3
{
public:
  TestObject3()
      :m_work(m_ioService),
      m_timer(m_ioService, boost::posix_time::milliseconds(200))
  {
      m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error));
  }

  void start()
  {
      // simple event loop
      m_ioService.run();
  }

  void stop()
  {
      // signal end of event loop
      m_ioService.stop();
  }

  void doWork(const boost::system::error_code& e)
  {
      // Do some work here
      if (e != boost::asio::error::operation_aborted)
      {
      m_timer.expires_from_now( boost::posix_time::milliseconds(200) );
      m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error));
      }
  }

private:
  boost::asio::io_service m_ioService;
  boost::asio::io_service::work m_work;
  boost::asio::deadline_timer m_timer;
};

Now to my specific questions:

1) Is the use of the boost condition variable correct? It seems like a bit of a hack to me: I wanted to wait for the thread to be launched so I waited on the condition variable. Then once the new thread has launched in the main method, I again wait on the same condition variable to allow the initial thread to continue. Then once the start method of the initial thread is exited, the new thread can continue. Is this ok?

2) Are there any cases in which the thread would not get launched successfully by the OS? I remember reading somewhere that this can occur. If this is possible, I should rather do a timed wait on the condition variable (as is commented out in the start method)?

3) I am aware that of the templated class could not implement the stop method "correctly" i.e. if the event loop fails to stop, the code will block on the joins (either in the stop or in the destructor) but I see no way around this. I guess it is up to the user of the class to make sure that the start and stop method are implemented correctly?

4) I would appreciate any other design mistakes, improvements, etc?

Thanks!

4

1 回答 1

0

最终解决了以下问题:

1)经过大量测试后,条件变量的使用似乎很好

2)这个问题还没有出现(还)

3)模板化的类实现必须满足要求,单元测试用于测试正确性

4) 改进

  • 添加了带锁的连接
  • 在生成的线程中捕获异常并在主线程中重新抛出以避免崩溃并且不会丢失异常信息
  • 使用 boost::system::error_code 将错误代码传回给调用者
  • 实现对象是可设置的

代码:

template <typename T>
class IService : private boost::noncopyable
{
  typedef boost::shared_ptr<boost::thread> thread_ptr;
  typedef T ServiceImpl;
public:
  typedef boost::shared_ptr<IService<T> > ptr;

  IService()
    :m_pServiceObject(&m_serviceObject)
  {
  }

  ~IService()
  {
    /// try stop the service in case it's running
    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      stop();
    }
  }

  static ptr create()
  {
    return boost::make_shared<IService<T> >();
  }

  /// Accessor to service implementation. The handle can be used to configure the implementation object
  ServiceImpl& get() { return m_serviceObject; }
  /// Mutator to service implementation. The handle can be used to configure the implementation object
  void set(ServiceImpl rServiceImpl)
  {
    // the implementation object cannot be modified once the thread has been created
    assert(m_pServiceThread == 0);
    m_serviceObject = rServiceImpl;
    m_pServiceObject = &m_serviceObject;
  }

  void set(ServiceImpl* pServiceImpl)
  {
    // the implementation object cannot be modified once the thread has been created
    assert(m_pServiceThread == 0);

    // make sure service object is valid
    if (pServiceImpl)
      m_pServiceObject = pServiceImpl; 
  }

  /// if the service implementation reports an error from the start or stop method call, it can be accessed via this method
  /// NB: only the last error can be accessed
  boost::system::error_code getServiceErrorCode() const { return m_ecService; }

  /// The join method allows the caller to block until thread completion
  void join()
  {
    // protect this method from being called twice (e.g. by user and by stop)
    boost::mutex::scoped_lock lock(m_joinMutex);
    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      m_pServiceThread->join();
      m_pServiceThread.reset();
    }
  }

  /// This method launches the non-blocking service
  boost::system::error_code start()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);

    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      // already running
      return boost::system::error_code(SHARED_INVALID_STATE, shared_category);
    }

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService2::main, this)));
    // Wait for condition to be signaled
    m_startCondition.wait(m_threadMutex);

    // notify main to continue: it's blocked on the same condition var
    m_startCondition.notify_one();
    // No error
    return boost::system::error_code();
  }

  /// This method stops the non-blocking service
  boost::system::error_code stop()
  {
    // trigger the stopping of the event loop
    //boost::system::error_code ec = m_serviceObject.stop();
    assert(m_pServiceObject);
    boost::system::error_code ec = m_pServiceObject->stop();
    if (ec)
    {
      m_ecService = ec;
      return ec;
    }

    // The service implementation can return an error code here for more information
    // However it is the responsibility of the implementation to stop the service event loop (if running)
    // Failure to do so, will result in a block
    // If this occurs in practice, we may consider a timed join?
    join();

    // If exception was thrown in new thread, rethrow it.
    // Should the template implementation class want to avoid this, it should catch the exception
    // in its start method and then return and error code instead
    if( m_exception )
      boost::rethrow_exception(m_exception);

    return ec;
  }

private:
  /// runs in it's own thread
  void main()
  {
    try
    {
      boost::mutex::scoped_lock lock(m_threadMutex);
      // notify main thread that it can continue
      m_startCondition.notify_one();
      // Try Dummy wait to allow 1st thread to resume
      m_startCondition.wait(m_threadMutex);

      // call implementation of event loop
      // This will block
      // In scenarios where the service fails to start, the implementation can return an error code
      m_ecService = m_pServiceObject->start();

      m_exception = boost::exception_ptr();
    } 
    catch (...)
    {
      m_exception = boost::current_exception();
    }
  }

  /// Service thread
  thread_ptr m_pServiceThread;
  /// Thread mutex
  mutable boost::mutex m_threadMutex;
  /// Join mutex
  mutable boost::mutex m_joinMutex;
  /// Condition for signaling start of thread
  boost::condition m_startCondition;

  /// T must satisfy the implicit service interface and provide a start and a stop method
  T m_serviceObject;
  T* m_pServiceObject;
  // Error code for service implementation errors
  boost::system::error_code m_ecService;

  // Exception ptr to transport exception across different threads
  boost::exception_ptr m_exception;
};

当然欢迎进一步的反馈/批评。

于 2011-03-15T06:51:53.007 回答