基础
让我们从一个简化的示例开始,并检查相关的 Boost.Asio 部分:
void handle_async_receive(...) { ... }
void print() { ... }
...
boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);
...
io_service.post(&print); // 1
socket.connect(endpoint); // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print); // 4
io_service.run(); // 5
什么是处理程序?
处理程序只不过是一个回调。在示例代码中,有 3 个处理程序:
- 处理程序(
print
1)。
- 处理程序(
handle_async_receive
3)。
- 处理程序(
print
4)。
即使同一个print()
函数被使用了两次,每次使用都被认为是创建自己的唯一可识别的处理程序。处理程序可以有多种形状和大小,从上面的基本函数到更复杂的构造,例如从函数boost::bind()
和 lambda 生成的函数。不管复杂性如何,处理程序仍然只是一个回调。
什么是工作?
工作是 Boost.Asio 被要求代表应用程序代码执行的一些处理。有时 Boost.Asio 可能会在被告知后立即开始一些工作,而有时它可能会等待稍后的时间点完成工作。一旦完成工作,Boost.Asio 将通过调用提供的处理程序来通知应用程序。
Boost.Asio 保证处理程序只会在当前调用run()
、run_one()
、poll()
或的线程中运行poll_one()
。这些线程将工作并调用处理程序。因此,在上面的示例中,print()
当它发布到io_service
(1) 中时不会调用。相反,它被添加到io_service
并且将在稍后的时间点被调用。在这种情况下,它在io_service.run()
(5)之内。
什么是异步操作?
异步操作创建工作,Boost.Asio将调用处理程序以在工作完成时通知应用程序。异步操作是通过调用一个名称带有前缀的函数来创建的async_
。这些函数也称为启动函数。
异步操作可以分解为三个独特的步骤:
- 启动或通知相关联
io_service
的工作需要完成。async_receive
操作 (3) 通知它io_service
需要从套接字异步读取数据,然后async_receive
立即返回。
- 做实际工作。在这种情况下,当
socket
接收到数据时,字节将被读取并复制到buffer
. 实际工作将在以下任一方面完成:
- 启动函数(3),如果Boost.Asio可以确定它不会阻塞。
- 当应用程序显式运行时
io_service
(5)。
- 调用
handle_async_receive
ReadHandler。再一次,处理程序只在运行io_service
. 因此,无论何时完成工作(3 或 5),都保证handle_async_receive()
只会在io_service.run()
(5)内调用。
这三个步骤在时间和空间上的分离称为控制流倒置。这是使异步编程变得困难的复杂性之一。但是,有一些技术可以帮助缓解这种情况,例如使用协程。
做什么io_service.run()
?
当线程调用时,将从该线程中调用io_service.run()
工作和处理程序。在上面的例子中,io_service.run()
(5) 将阻塞直到:
- 它已从两个
print
处理程序调用并返回,接收操作以成功或失败完成,并且其handle_async_receive
处理程序已被调用并返回。
- 通过
io_service
明确停止io_service::stop()
。
- 从处理程序中引发异常。
一种潜在的伪流可以描述如下:
创建 io_service
创建套接字
将打印处理程序添加到 io_service (1)
等待套接字连接 (2)
向io_service添加异步读工作请求(3)
将打印处理程序添加到 io_service (4)
运行 io_service (5)
有工作或处理程序吗?
是的,有 1 个工作和 2 个处理程序
socket有数据吗?不,什么都不做
运行打印处理程序 (1)
有工作或处理程序吗?
是的,有 1 个工作和 1 个处理程序
socket有数据吗?不,什么都不做
运行打印处理程序 (4)
有工作或处理程序吗?
是的,有 1 件作品
socket有数据吗?不,继续等待
-- 套接字接收数据 --
套接字有数据,将其读入缓冲区
将 handle_async_receive 处理程序添加到 io_service
有工作或处理程序吗?
是的,有 1 个处理程序
运行 handle_async_receive 处理程序 (3)
有工作或处理程序吗?
否,将 io_service 设置为已停止并返回
请注意,当读取完成时,它如何将另一个处理程序添加到io_service
. 这个微妙的细节是异步编程的一个重要特征。它允许将处理程序链接在一起。例如,如果handle_async_receive
没有得到它期望的所有数据,那么它的实现可能会发布另一个异步读取操作,从而导致io_service
更多的工作,因此不会从io_service.run()
.
请注意,当io_service
已用完工作时,应用程序必须reset()
在io_service
再次运行之前。
示例问题和示例 3a 代码
现在,让我们检查问题中引用的两段代码。
问题代码
socket->async_receive
将工作添加到io_service
. 因此,io_service->run()
将阻塞直到读取操作以成功或错误完成,并且ClientReceiveEvent
已经完成运行或引发异常。
为了使其更容易理解,这里有一个较小的注释示例 3a:
void CalculateFib(std::size_t n);
int main()
{
boost::asio::io_service io_service;
boost::optional<boost::asio::io_service::work> work = // '. 1
boost::in_place(boost::ref(io_service)); // .'
boost::thread_group worker_threads; // -.
for(int x = 0; x < 2; ++x) // :
{ // '.
worker_threads.create_thread( // :- 2
boost::bind(&boost::asio::io_service::run, &io_service) // .'
); // :
} // -'
io_service.post(boost::bind(CalculateFib, 3)); // '.
io_service.post(boost::bind(CalculateFib, 4)); // :- 3
io_service.post(boost::bind(CalculateFib, 5)); // .'
work = boost::none; // 4
worker_threads.join_all(); // 5
}
概括地说,程序将创建 2 个线程来处理io_service
的事件循环 (2)。这会产生一个简单的线程池,用于计算斐波那契数 (3)。
问题代码和此代码之间的一个主要区别在于,此代码在实际工作之前io_service::run()
调用(2) ,并将处理程序添加到(3)。为了防止立即返回,创建了一个对象 (1)。此对象可防止工作结束;因此,不会因为没有工作而返回。io_service
io_service::run()
io_service::work
io_service
io_service::run()
整体流程如下:
- 创建并添加
io_service::work
添加到io_service
.
- 创建的线程池调用
io_service::run()
. 这些工作线程不会io_service
因为io_service::work
对象而返回。
- 将 3 个计算斐波那契数的处理程序添加到
io_service
, 并立即返回。工作线程,而不是主线程,可能会立即开始运行这些处理程序。
- 删除
io_service::work
对象。
- 等待工作线程完成运行。这只会在所有 3 个处理程序都完成执行后才会发生,因为它们
io_service
既没有处理程序也没有工作。
代码可以以与原始代码相同的方式以不同的方式编写,其中将处理程序添加到io_service
,然后io_service
处理事件循环。这消除了使用的需要io_service::work
,并产生以下代码:
int main()
{
boost::asio::io_service io_service;
io_service.post(boost::bind(CalculateFib, 3)); // '.
io_service.post(boost::bind(CalculateFib, 4)); // :- 3
io_service.post(boost::bind(CalculateFib, 5)); // .'
boost::thread_group worker_threads; // -.
for(int x = 0; x < 2; ++x) // :
{ // '.
worker_threads.create_thread( // :- 2
boost::bind(&boost::asio::io_service::run, &io_service) // .'
); // :
} // -'
worker_threads.join_all(); // 5
}
同步与异步
尽管问题中的代码使用了异步操作,但它实际上是同步运行的,因为它正在等待异步操作完成:
socket.async_receive(buffer, handler)
io_service.run();
相当于:
boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);
作为一般经验法则,尽量避免混合同步和异步操作。很多时候,它可以把一个复杂的系统变成一个复杂的系统。这个答案突出了异步编程的优点,其中一些也包含在 Boost.Asio文档中。