最初的方法可能存在一些挑战:
boost::asio::io_service
并非旨在派生或重新实现。注意缺少虚函数。
- 如果您的线程库不提供查询线程状态的能力,则需要单独管理状态信息。
另一种解决方案是将工作发布到 中io_service
,然后检查它在 中放置了多长时间io_service
。如果它准备好运行和实际运行之间的时间差超过某个阈值,那么这表明队列中的作业比服务队列的线程多。这样做的一个主要好处是动态线程池增长逻辑与其他逻辑分离。
这是一个使用deadline_timer
.
- 设置为从现在起几秒钟后
deadline_timer
过期。3
- 异步等待
deadline_timer
。处理程序将在设置后3
几秒钟内准备好运行deadline_timer
。
- 在异步处理程序中,检查相对于计时器设置为过期时间的当前时间。如果大于
2
秒,则io_service
队列正在备份,因此向线程池添加一个线程。
例子:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
class thread_pool_checker
: private boost::noncopyable
{
public:
thread_pool_checker( boost::asio::io_service& io_service,
boost::thread_group& threads,
unsigned int max_threads,
long threshold_seconds,
long periodic_seconds )
: io_service_( io_service ),
timer_( io_service ),
threads_( threads ),
max_threads_( max_threads ),
threshold_seconds_( threshold_seconds ),
periodic_seconds_( periodic_seconds )
{
schedule_check();
}
private:
void schedule_check();
void on_check( const boost::system::error_code& error );
private:
boost::asio::io_service& io_service_;
boost::asio::deadline_timer timer_;
boost::thread_group& threads_;
unsigned int max_threads_;
long threshold_seconds_;
long periodic_seconds_;
};
void thread_pool_checker::schedule_check()
{
// Thread pool is already at max size.
if ( max_threads_ <= threads_.size() )
{
std::cout << "Thread pool has reached its max. Example will shutdown."
<< std::endl;
io_service_.stop();
return;
}
// Schedule check to see if pool needs to increase.
std::cout << "Will check if pool needs to increase in "
<< periodic_seconds_ << " seconds." << std::endl;
timer_.expires_from_now( boost::posix_time::seconds( periodic_seconds_ ) );
timer_.async_wait(
boost::bind( &thread_pool_checker::on_check, this,
boost::asio::placeholders::error ) );
}
void thread_pool_checker::on_check( const boost::system::error_code& error )
{
// On error, return early.
if ( error ) return;
// Check how long this job was waiting in the service queue. This
// returns the expiration time relative to now. Thus, if it expired
// 7 seconds ago, then the delta time is -7 seconds.
boost::posix_time::time_duration delta = timer_.expires_from_now();
long wait_in_seconds = -delta.seconds();
// If the time delta is greater than the threshold, then the job
// remained in the service queue for too long, so increase the
// thread pool.
std::cout << "Job job sat in queue for "
<< wait_in_seconds << " seconds." << std::endl;
if ( threshold_seconds_ < wait_in_seconds )
{
std::cout << "Increasing thread pool." << std::endl;
threads_.create_thread(
boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
// Otherwise, schedule another pool check.
schedule_check();
}
// Busy work functions.
void busy_work( boost::asio::io_service&,
unsigned int );
void add_busy_work( boost::asio::io_service& io_service,
unsigned int count )
{
io_service.post(
boost::bind( busy_work,
boost::ref( io_service ),
count ) );
}
void busy_work( boost::asio::io_service& io_service,
unsigned int count )
{
boost::this_thread::sleep( boost::posix_time::seconds( 5 ) );
count += 1;
// When the count is 3, spawn additional busy work.
if ( 3 == count )
{
add_busy_work( io_service, 0 );
}
add_busy_work( io_service, count );
}
int main()
{
using boost::asio::ip::tcp;
// Create io service.
boost::asio::io_service io_service;
// Add some busy work to the service.
add_busy_work( io_service, 0 );
// Create thread group and thread_pool_checker.
boost::thread_group threads;
thread_pool_checker checker( io_service, threads,
3, // Max pool size.
2, // Create thread if job waits for 2 sec.
3 ); // Check if pool needs to grow every 3 sec.
// Start running the io service.
io_service.run();
threads.join_all();
return 0;
}
输出:
将检查池是否需要在 3 秒内增加。
作业作业排队等待 7 秒。
增加线程池。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 0 秒。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 4 秒。
增加线程池。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 0 秒。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 0 秒。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 0 秒。
将检查池是否需要在 3 秒内增加。
作业作业在队列中等待了 3 秒。
增加线程池。
线程池已达到最大值。示例将关闭。