99

作为 Boost.Asio 的初学者,我对io_service::run(). 如果有人可以在此方法阻止/解除阻止时向我解释,我将不胜感激。文件指出:

run()函数会阻塞,直到所有工作完成并且没有更多的处理程序要分派,或者直到io_service停止。

多个线程可以调用该run()函数来建立一个线程池,io_service可以从中执行处理程序。在池中等待的所有线程都是等效的,io_service可以选择其中任何一个来调用处理程序。

正常退出run()函数意味着io_service对象已停止(stopped()函数返回 true)。后续调用run(),或将立即返回run_one(),除非之前调用过。poll()poll_one()reset()

以下陈述是什么意思?

[...] 不再派发处理程序 [...]


在试图理解 的行为时io_service::run(),我遇到了这个示例(示例 3a)。在其中,我观察到io_service->run()阻塞并等待工作指令。

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

但是,在我正在处理的以下代码中,客户端使用 TCP/IP 进行连接,并且 run 方法会阻塞,直到异步接收到数据。

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

run()在下面的两个示例中描述其行为的任何解释都将受到赞赏。

4

2 回答 2

259

基础

让我们从一个简化的示例开始,并检查相关的 Boost.Asio 部分:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

什么是处理程序

处理程序只不过是一个回调。在示例代码中,有 3 个处理程序:

  • 处理程序( print1)。
  • 处理程序( handle_async_receive3)。
  • 处理程序( print4)。

即使同一个print()函数被使用了两次,每次使用都被认为是创建自己的唯一可识别的处理程序。处理程序可以有多种形状和大小,从上面的基本函数到更复杂的构造,例如从函数boost::bind()和 lambda 生成的函数。不管复杂性如何,处理程序仍然只是一个回调。

什么是工作

工作是 Boost.Asio 被要求代表应用程序代码执行的一些处理。有时 Boost.Asio 可能会在被告知后立即开始一些工作,而有时它可能会等待稍后的时间点完成工作。一旦完成工作,Boost.Asio 将通过调用提供的处理程序来通知应用程序。

Boost.Asio 保证处理程序只会在当前调用run()run_one()poll()或的线程中运行poll_one()。这些线程将工作并调用处理程序。因此,在上面的示例中,print()当它发布到io_service(1) 中时不会调用。相反,它被添加到io_service并且将在稍后的时间点被调用。在这种情况下,它在io_service.run()(5)之内。

什么是异步操作?

异步操作创建工作,Boost.Asio将调用处理程序以在工作完成时通知应用程序。异步操作是通过调用一个名称带有前缀的函数来创建的async_。这些函数也称为启动函数

异步操作可以分解为三个独特的步骤:

  • 启动或通知相关联io_service的工作需要完成。async_receive操作 (3) 通知它io_service需要从套接字异步读取数据,然后async_receive立即返回。
  • 做实际工作。在这种情况下,当socket接收到数据时,字节将被读取并复制到buffer. 实际工作将在以下任一方面完成:
    • 启动函数(3),如果Boost.Asio可以确定它不会阻塞。
    • 当应用程序显式运行时io_service(5)。
  • 调用handle_async_receive ReadHandler。再一次,处理程序只在运行io_service. 因此,无论何时完成工作(3 或 5),都保证handle_async_receive()只会在io_service.run()(5)内调用。

这三个步骤在时间和空间上的分离称为控制流倒置。这是使异步编程变得困难的复杂性之一。但是,有一些技术可以帮助缓解这种情况,例如使用协程

做什么io_service.run()

当线程调用时,将从该线程中调用io_service.run()工作和处理程序。在上面的例子中,io_service.run()(5) 将阻塞直到:

  • 它已从两个print处理程序调用并返回,接收操作以成功或失败完成,并且其handle_async_receive处理程序已被调用并返回。
  • 通过io_service明确停止io_service::stop()
  • 从处理程序中引发异常。

一种潜在的伪流可以描述如下:

创建 io_service
创建套接字
将打印处理程序添加到 io_service (1)
等待套接字连接 (2)
向io_service添加异步读工作请求(3)
将打印处理程序添加到 io_service (4)
运行 io_service (5)
  有工作或处理程序吗?
    是的,有 1 个工作和 2 个处理程序
      socket有数据吗?不,什么都不做
      运行打印处理程序 (1)
  有工作或处理程序吗?
    是的,有 1 个工作和 1 个处理程序
      socket有数据吗?不,什么都不做
      运行打印处理程序 (4)
  有工作或处理程序吗?
    是的,有 1 件作品
      socket有数据吗?不,继续等待
  -- 套接字接收数据 --
      套接字有数据,将其读入缓冲区
      将 handle_async_receive 处理程序添加到 io_service
  有工作或处理程序吗?
    是的,有 1 个处理程序
      运行 handle_async_receive 处理程序 (3)
  有工作或处理程序吗?
    否,将 io_service 设置为已停止并返回

请注意,当读取完成时,它如何将另一个处理程序添加到io_service. 这个微妙的细节是异步编程的一个重要特征。它允许将处理程序链接在一起。例如,如果handle_async_receive没有得到它期望的所有数据,那么它的实现可能会发布另一个异步读取操作,从而导致io_service更多的工作,因此不会从io_service.run().

请注意,当io_service已用完工作时,应用程序必须reset()io_service再次运行之前。


示例问题和示例 3a 代码

现在,让我们检查问题中引用的两段代码。

问题代码

socket->async_receive将工作添加到io_service. 因此,io_service->run()将阻塞直到读取操作以成功或错误完成,并且ClientReceiveEvent已经完成运行或引发异常。

示例 3a代码

为了使其更容易理解,这里有一个较小的注释示例 3a:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

概括地说,程序将创建 2 个线程来处理io_service的事件循环 (2)。这会产生一个简单的线程池,用于计算斐波那契数 (3)。

问题代码和此代码之间的一个主要区别在于,此代码在实际工作之前io_service::run()调用(2) ,并将处理程序添加到(3)。为了防止立即返回,创建了一个对象 (1)。此对象可防止工作结束;因此,不会因为没有工作而返回。io_serviceio_service::run()io_service::workio_serviceio_service::run()

整体流程如下:

  1. 创建并添加io_service::work添加到io_service.
  2. 创建的线程池调用io_service::run(). 这些工作线程不会io_service因为io_service::work对象而返回。
  3. 将 3 个计算斐波那契数的处理程序添加到io_service, 并立即返回。工作线程,而不是主线程,可能会立即开始运行这些处理程序。
  4. 删除io_service::work对象。
  5. 等待工作线程完成运行。这只会在所有 3 个处理程序都完成执行后才会发生,因为它们io_service既没有处理程序也没有工作。

代码可以以与原始代码相同的方式以不同的方式编写,其中将处理程序添加到io_service,然后io_service处理事件循环。这消除了使用的需要io_service::work,并产生以下代码:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}

同步与异步

尽管问题中的代码使用了异步操作,但它实际上是同步运行的,因为它正在等待异步操作完成:

socket.async_receive(buffer, handler)
io_service.run();

相当于:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

作为一般经验法则,尽量避免混合同步和异步操作。很多时候,它可以把一个复杂的系统变成一个复杂的系统。这个答案突出了异步编程的优点,其中一些也包含在 Boost.Asio文档中。

于 2013-03-22T16:48:01.787 回答
20

为了简化如何run做,将其视为必须处理一堆纸的员工;它需要一张纸,按照纸上说的做,把纸扔掉,然后拿下一张;当他的床单用完时,它就离开了办公室。每张纸上都可以有任何类型的说明,甚至可以添加一张新纸。回到 asio:您可以通过io_service两种方式赋予作品,本质上是:通过post在您链接的示例中使用它,或者通过使用内部调用post的其他对象io_service,如 thesocket及其async_*方法。

于 2013-03-22T10:40:28.200 回答