简而言之,您需要用另一个函数包装用户提供的任务,该函数将:
- 调用用户函数或可调用对象。
- 锁定互斥锁并递减计数器。
我可能不了解此线程池的所有要求。因此,为了清楚起见,这里有一个关于我认为是要求的明确列表:
- 池管理线程的生命周期。用户不应该能够删除驻留在池中的线程。
- 用户可以以非侵入方式将任务分配给池。
- 分配任务时,如果池中的所有线程当前都在运行其他任务,则丢弃该任务。
在提供实现之前,我想强调几个关键点:
- 一旦一个线程被启动,它将一直运行直到完成、取消或终止。线程正在执行的函数不能被重新分配。为了允许单个线程在其生命周期中执行多个函数,线程将希望使用将从队列中读取的函数启动,例如
io_service::run()
,并将可调用类型发布到事件队列中,例如 from io_service::post()
。
io_service::run()
如果 中没有待处理的工作,则返回io_service
,io_service
停止,或者线程正在运行的处理程序抛出异常。为了防止io_serivce::run()
在没有未完成的工作时返回,io_service::work
可以使用该类。
- 定义任务的类型要求(即任务的类型必须可通过
object()
语法调用)而不是要求类型(即任务必须继承自process
),为用户提供了更大的灵活性。它允许用户提供任务作为函数指针或提供 nullary 的类型operator()
。
使用实现boost::asio
:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}
/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}
/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}
private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};
关于实现的几点评论:
- 异常处理需要围绕用户的任务进行。如果用户的函数或可调用对象抛出非 type 的异常,
boost::thread_interrupted
则std::terminate()
调用。这是 Boost.Thread在线程函数行为中出现异常的结果。Boost.Asio从 handlers 抛出异常的效果也值得一读。
- 如果用户提供
task
via boost::bind
,则嵌套boost::bind
将无法编译。需要以下选项之一:
- 不支持
task
由boost::bind
.
- 元编程根据用户的类型是否可以使用结果来执行编译时分支,因为
boost::bind
只有在某些函数对象上才能正确运行。boost::protect
boost::protect
task
使用另一种类型间接传递对象。我选择以boost::function
丢失确切类型为代价来提高可读性。 boost::tuple
,虽然可读性稍差,但也可用于保留确切的类型,如 Boost.Asio 的序列化示例所示。
应用程序代码现在可以thread_pool
非侵入性地使用该类型:
void work() {};
struct worker
{
void operator()() {};
};
void more_work( int ) {};
int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}
可以在没有 Boost.Asio的thread_pool
情况下创建,并且对维护者来说可能稍微容易一些,因为他们不再需要了解Boost.Asio
行为,例如何时io_service::run()
返回以及io_service::work
对象是什么:
#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}
/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}
try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}
private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;
// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();
lock.unlock();
// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};