我使用的解决方案依赖于我拥有线程池对象的实现这一事实。我创建了一个包装器类型,它将更新统计信息,并复制发布到线程池的用户定义的处理程序。只有这种包装器类型会发布到底层io_service
. 这种方法允许我跟踪发布/执行的处理程序,而不必侵入用户代码。
这是一个精简和简化的示例:
#include <iostream>
#include <memory>
#include <vector>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
// Supports scheduling anonymous jobs that are
// executable as returning nothing and taking
// no arguments
typedef std::function<void(void)> functor_type;
// some way to store per-thread statistics
typedef std::map<boost::thread::id, int> thread_jobcount_map;
// only this type is actually posted to
// the asio proactor, this delegates to
// the user functor in operator()
struct handler_wrapper
{
handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics)
: user_functor_(user_functor)
, statistics_(statistics)
{
}
void operator()()
{
user_functor_();
// just for illustration purposes, assume a long running job
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
// increment executed jobs
++statistics_[boost::this_thread::get_id()];
}
functor_type user_functor_;
thread_jobcount_map& statistics_;
};
// anonymous thread function, just runs the proactor
void thread_func(boost::asio::io_service& proactor)
{
proactor.run();
}
class ThreadPool
{
public:
ThreadPool(size_t thread_count)
{
threads_.reserve(thread_count);
work_.reset(new boost::asio::io_service::work(proactor_));
for(size_t curr = 0; curr < thread_count; ++curr)
{
boost::thread th(thread_func, boost::ref(proactor_));
// inserting into this map before any work can be scheduled
// on it, means that we don't have to look it for lookups
// since we don't dynamically add threads
thread_jobcount_.insert(std::make_pair(th.get_id(), 0));
threads_.emplace_back(std::move(th));
}
}
// the only way for a user to get work into
// the pool is to use this function, which ensures
// that the handler_wrapper type is used
void schedule(const functor_type& user_functor)
{
handler_wrapper to_execute(user_functor, thread_jobcount_);
proactor_.post(to_execute);
}
void join()
{
// join all threads in pool:
work_.reset();
proactor_.stop();
std::for_each(
threads_.begin(),
threads_.end(),
[] (boost::thread& t)
{
t.join();
});
}
// just an example showing statistics
void log()
{
std::for_each(
thread_jobcount_.begin(),
thread_jobcount_.end(),
[] (const thread_jobcount_map::value_type& it)
{
std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n";
});
}
private:
std::vector<boost::thread> threads_;
std::unique_ptr<boost::asio::io_service::work> work_;
boost::asio::io_service proactor_;
thread_jobcount_map thread_jobcount_;
};
struct add
{
add(int lhs, int rhs, int* result)
: lhs_(lhs)
, rhs_(rhs)
, result_(result)
{
}
void operator()()
{
*result_ = lhs_ + rhs_;
}
int lhs_,rhs_;
int* result_;
};
int main(int argc, char **argv)
{
// some "state objects" that are
// manipulated by the user functors
int x = 0, y = 0, z = 0;
// pool of three threads
ThreadPool pool(3);
// schedule some handlers to do some work
pool.schedule(add(5, 4, &x));
pool.schedule(add(2, 2, &y));
pool.schedule(add(7, 8, &z));
// give all the handlers time to execute
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
std::cout
<< "x = " << x << "\n"
<< "y = " << y << "\n"
<< "z = " << z << "\n";
pool.join();
pool.log();
}
输出:
x = 9
y = 4
z = 15
Thread: 0000000000B25430 executed 1 jobs
Thread: 0000000000B274F0 executed 1 jobs
Thread: 0000000000B27990 executed 1 jobs