7

我将使用单线程池实现 boost::asio 服务器io_serviceHTTP Server 3 示例)。io_service将绑定到 unix 域套接字并将请求从该套接字上的连接传递到不同的线程。为了减少资源消耗,我想让线程池动态化。

这是一个概念。首先创建一个线程。当请求到达并且服务器发现池中没有空闲线程时,它会创建一个新线程并将请求传递给它。服务器最多可以创建某个最大数量的线程。理想情况下,它应该具有暂停空闲一段时间的线程的功能。

有人做过类似的东西吗?或者也许有人有一个相关的例子?

至于我,我想我应该以某种方式覆盖io_service.dispatch以实现这一目标。

4

1 回答 1

8

最初的方法可能存在一些挑战:

  • boost::asio::io_service并非旨在派生或重新实现。注意缺少虚函数。
  • 如果您的线程库不提供查询线程状态的能力,则需要单独管理状态信息。

另一种解决方案是将工作发布到 中io_service,然后检查它在 中放置了多长时间io_service。如果它准备好运行和实际运行之间的时间差超过某个阈值,那么这表明队列中的作业比服务队列的线程多。这样做的一个主要好处是动态线程池增长逻辑与其他逻辑分离。

这是一个使用deadline_timer.

  • 设置为从现在起几秒钟后deadline_timer过期。3
  • 异步等待deadline_timer。处理程序将在设置后3几秒钟内准备好运行deadline_timer
  • 在异步处理程序中,检查相对于计时器设置为过期时间的当前时间。如果大于2秒,则io_service队列正在备份,因此向线程池添加一个线程。

例子:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>

class thread_pool_checker
  : private boost::noncopyable
{
public:

  thread_pool_checker( boost::asio::io_service& io_service,
                       boost::thread_group& threads,
                       unsigned int max_threads,
                       long threshold_seconds,
                       long periodic_seconds )
    : io_service_( io_service ),
      timer_( io_service ),
      threads_( threads ),
      max_threads_( max_threads ),
      threshold_seconds_( threshold_seconds ),
      periodic_seconds_( periodic_seconds )
    {
      schedule_check();
    }

private:

  void schedule_check();
  void on_check( const boost::system::error_code& error );

private:

  boost::asio::io_service&    io_service_;
  boost::asio::deadline_timer timer_;
  boost::thread_group&        threads_;
  unsigned int                max_threads_;
  long                        threshold_seconds_;
  long                        periodic_seconds_;
};

void thread_pool_checker::schedule_check()
{
  // Thread pool is already at max size.
  if ( max_threads_ <= threads_.size() )
  {
    std::cout << "Thread pool has reached its max.  Example will shutdown."
              << std::endl;
    io_service_.stop();
    return;
  }

  // Schedule check to see if pool needs to increase.
  std::cout << "Will check if pool needs to increase in " 
            << periodic_seconds_ << " seconds." << std::endl;
  timer_.expires_from_now( boost::posix_time::seconds( periodic_seconds_ ) );
  timer_.async_wait( 
    boost::bind( &thread_pool_checker::on_check, this,
                 boost::asio::placeholders::error ) );
}

void thread_pool_checker::on_check( const boost::system::error_code& error )
{
  // On error, return early.
  if ( error ) return;

  // Check how long this job was waiting in the service queue.  This
  // returns the expiration time relative to now.  Thus, if it expired
  // 7 seconds ago, then the delta time is -7 seconds.
  boost::posix_time::time_duration delta = timer_.expires_from_now();
  long wait_in_seconds = -delta.seconds();

  // If the time delta is greater than the threshold, then the job
  // remained in the service queue for too long, so increase the
  // thread pool.
  std::cout << "Job job sat in queue for " 
            << wait_in_seconds << " seconds." << std::endl;
  if ( threshold_seconds_ < wait_in_seconds )
  {
    std::cout << "Increasing thread pool." << std::endl;
    threads_.create_thread(
      boost::bind( &boost::asio::io_service::run,
                   &io_service_ ) );
  }

  // Otherwise, schedule another pool check.
  schedule_check();
}

// Busy work functions.
void busy_work( boost::asio::io_service&,
                unsigned int );

void add_busy_work( boost::asio::io_service& io_service,
                    unsigned int count )
{
  io_service.post(
    boost::bind( busy_work,
                 boost::ref( io_service ),
                 count ) );
}

void busy_work( boost::asio::io_service& io_service,
                unsigned int count )
{
  boost::this_thread::sleep( boost::posix_time::seconds( 5 ) );

  count += 1;

  // When the count is 3, spawn additional busy work.
  if ( 3 == count )
  {
    add_busy_work( io_service, 0 );
  }
  add_busy_work( io_service, count );
}

int main()
{
  using boost::asio::ip::tcp;

  // Create io service.
  boost::asio::io_service io_service;

  // Add some busy work to the service.
  add_busy_work( io_service, 0 );

  // Create thread group and thread_pool_checker.
  boost::thread_group threads;
  thread_pool_checker checker( io_service, threads,
                               3,   // Max pool size.
                               2,   // Create thread if job waits for 2 sec.
                               3 ); // Check if pool needs to grow every 3 sec.

  // Start running the io service.
  io_service.run();

  threads.join_all();

  return 0;
}

输出:

将检查池是否需要在 3 秒内增加。
作业作业排队等待 7 秒。
增加线程池。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 0 秒。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 4 秒。
增加线程池。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 0 秒。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 0 秒。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 0 秒。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 3 秒。
增加线程池。
线程池已达到最大值。示例将关闭。
于 2012-06-26T16:44:13.157 回答