2

我使用 boost:asio 和多​​个 io_services 来保持不同形式的阻塞 I/O 分开。例如,我有一个 io_service 用于阻塞文件 I/O,另一个用于长时间运行的 CPU 密集型任务(这可以扩展到第三个用于阻塞网络 I/O 等)一般来说,我想确保一种形式阻塞 I/O 不能饿死其他的。

我遇到的问题是,由于在一个 io_service 中运行的任务可以将事件发布到其他 io_service(例如,受 CPU 限制的任务可能需要启动文件 I/O 操作,或者完成的文件 I/O 操作可能会调用 CPU-绑定回调),我不知道如何让两个 io_services 都运行,直到它们都没有事件。

通常使用单个 I/O 服务,您可以执行以下操作:

 shared_ptr<asio::io_service> io_service (new asio::io_service);
 shared_ptr<asio::io_service::work> work (
   new asio::io_service::work(*io_service));

 // Create worker thread(s) that call io_service->run()

 io_service->post(/* some event */);

 work.reset();

 // Join worker thread(s)

但是,如果我只是为这两个 io_services 执行此操作,那么我没有在其中发布初始事件的那个会立即完成。即使我向两者发布初始事件,如果 io_service B 上的初始事件在 io_service A 上的任务向 B 发布新事件之前完成,io_service B 也会过早完成。

如何在 io_service A 仍在处理事件时保持 io_service B 运行(因为服务 A 中的一个排队事件可能会向 B 发布新事件),反之亦然,同时仍然确保两个 io_services 退出它们的 run() 方法如果他们同时出局?

4

2 回答 2

3

想出了一种方法来做到这一点,所以记录下来以防其他人在搜索中发现这个问题:

  • 每创建N个交叉通信的io_services,为它们每一个创建一个工作对象,然后启动它们的工作线程。

  • 创建一个不会运行任何工作线程的“主”io_service 对象。

  • 不允许将事件直接发布到服务。相反,为 io_services 创建访问器函数,它将:

    1. 在主线程上创建一个工作对象。
    2. 将回调包装在一个运行真正回调的函数中,然后删除工作。
    3. 而是发布这个包装的回调。
  • 在执行的主要流程中,一旦所有 N io_services 都已启动并且您已将工作发布到其中至少一个,请在主 io_service 上调用 run()。

  • 当master io_service 的run() 方法返回时,删除所有N 个交叉通信io_services 上的初始工作,并加入所有工作线程。

让主 io_service 的线程自己处理每个其他 io_service 可确保它们不会终止,直到主 io_service 用完工作。让每个其他 io_services 自己在主 io_service 上为每个发布的回调工作确保主 io_service 不会耗尽工作,直到每个其他 io_services 不再有任何发布的回调要处理。

一个例子(可以封装在一个类中):

shared_ptr<boost::asio::io_service> master_io_service;

void RunWorker(boost::shared_ptr<boost::asio::io_service> io_service) {
  io_service->run();
}

void RunCallbackAndDeleteWork(boost::function<void()> callback,
                              boost::asio::io_service::work* work) {
  callback();
  delete work;
}

// All new posted callbacks must come through here, rather than being posted
// directly to the io_service object.
void PostToService(boost::shared_ptr<boost::asio::io_service> io_service,
                   boost::function<void()> callback) {
  io_service->post(boost::bind(
      &RunCallbackAndDeleteWork, callback,
      new boost::asio::io_service::work(*master_io_service)));
}

int main() {
  vector<boost::shared_ptr<boost::asio::io_service> > io_services;
  vector<boost::shared_ptr<boost::asio::io_service::work> > initial_work;
  boost::thread_pool worker_threads;

  master_io_service.reset(new boost::asio::io_service);

  const int kNumServices = X;
  const int kNumWorkersPerService = Y;
  for (int i = 0; i < kNumServices; ++i) {
    shared_ptr<boost::asio::io_service> io_service(new boost::asio::io_service);
    io_services.push_back(io_service);
    initial_work.push_back(new boost::asio::io_service::work(*io_service));

    for (int j = 0; j < kNumWorkersPerService; ++j) {
      worker_threads.create_thread(boost::bind(&RunWorker, io_service));
    }
  }

  // Use PostToService to start initial task(s) on at least one of the services

  master_io_service->run();

  // At this point, there is no real work left in the services, only the work
  // objects in the initial_work vector.
  initial_work.clear();
  worker_threads.join_all();
  return 0;
}
于 2013-03-08T18:31:04.843 回答
2

HTTP 服务器示例 2做了类似的事情,您可能会发现它很有用。它使用为每个io_service保留vectorsshared_ptr<boost::asio::io_service>和 a的池的概念。它使用线程池来运行每个服务。shared_ptr<boost::asio::io_service::work>io_service

该示例使用循环调度来分配 I/O 服务的工作,我认为这不适用于您的情况,因为您有io_serviceA 和io_serviceB的特定任务。

于 2013-03-07T00:07:12.857 回答