想出了一种方法来做到这一点,所以记录下来以防其他人在搜索中发现这个问题:
每创建N个交叉通信的io_services,为它们每一个创建一个工作对象,然后启动它们的工作线程。
创建一个不会运行任何工作线程的“主”io_service 对象。
不允许将事件直接发布到服务。相反,为 io_services 创建访问器函数,它将:
- 在主线程上创建一个工作对象。
- 将回调包装在一个运行真正回调的函数中,然后删除工作。
- 而是发布这个包装的回调。
在执行的主要流程中,一旦所有 N io_services 都已启动并且您已将工作发布到其中至少一个,请在主 io_service 上调用 run()。
当master io_service 的run() 方法返回时,删除所有N 个交叉通信io_services 上的初始工作,并加入所有工作线程。
让主 io_service 的线程自己处理每个其他 io_service 可确保它们不会终止,直到主 io_service 用完工作。让每个其他 io_services 自己在主 io_service 上为每个发布的回调工作确保主 io_service 不会耗尽工作,直到每个其他 io_services 不再有任何发布的回调要处理。
一个例子(可以封装在一个类中):
shared_ptr<boost::asio::io_service> master_io_service;
void RunWorker(boost::shared_ptr<boost::asio::io_service> io_service) {
io_service->run();
}
void RunCallbackAndDeleteWork(boost::function<void()> callback,
boost::asio::io_service::work* work) {
callback();
delete work;
}
// All new posted callbacks must come through here, rather than being posted
// directly to the io_service object.
void PostToService(boost::shared_ptr<boost::asio::io_service> io_service,
boost::function<void()> callback) {
io_service->post(boost::bind(
&RunCallbackAndDeleteWork, callback,
new boost::asio::io_service::work(*master_io_service)));
}
int main() {
vector<boost::shared_ptr<boost::asio::io_service> > io_services;
vector<boost::shared_ptr<boost::asio::io_service::work> > initial_work;
boost::thread_pool worker_threads;
master_io_service.reset(new boost::asio::io_service);
const int kNumServices = X;
const int kNumWorkersPerService = Y;
for (int i = 0; i < kNumServices; ++i) {
shared_ptr<boost::asio::io_service> io_service(new boost::asio::io_service);
io_services.push_back(io_service);
initial_work.push_back(new boost::asio::io_service::work(*io_service));
for (int j = 0; j < kNumWorkersPerService; ++j) {
worker_threads.create_thread(boost::bind(&RunWorker, io_service));
}
}
// Use PostToService to start initial task(s) on at least one of the services
master_io_service->run();
// At this point, there is no real work left in the services, only the work
// objects in the initial_work vector.
initial_work.clear();
worker_threads.join_all();
return 0;
}