7

现在的情况

我使用 boost.asio 实现了一个 TCP 服务器,它当前使用一个io_service对象,我run从一个线程调用该方法。

到目前为止,服务器能够立即响应客户端的请求,因为它在内存中拥有所有必要的信息(接收处理程序中不需要长时间运行的操作)。

问题

现在需求发生了变化,我需要从数据库(使用 ODBC)中获取一些信息——这基本上是一个长时间运行的阻塞操作——以便为客户端创建响应。

我看到了几种方法,但我不知道哪种方法最好(而且可能还有更多方法):

第一种方法

我可以将长时间运行的操作保留在处理程序中,并简单地io_service.run()从多个线程调用。我想我会使用尽可能多的线程,因为我有可用的 CPU 内核?

虽然这种方法很容易实现,但我认为我不会使用这种方法获得最佳性能,因为线程数量有限(由于数据库访问更多是 I/O 绑定操作,因此大部分时间都处于空闲状态比计算受限的操作)。

第二种方法

本文件的第 6 节中,它说:

将线程用于长时间运行的任务

单线程设计的一种变体,这种设计仍然使用单个 io_service::run() 线程来实现协议逻辑。长时间运行或阻塞的任务被传递给后台线程,一旦完成,结果就会被发送回 io_service::run() 线程。

这听起来很有希望,但我不知道如何实现。任何人都可以为这种方法提供一些代码片段/示例吗?

第三种方法

Boris Schäling 在其 boost 介绍的第 7.5 节中解释了如何使用自定义服务扩展 boost.asio。

这看起来工作量很大。与其他方法相比,这种方法有什么好处吗?

4

2 回答 2

16

这些方法不是明确相互排斥的。我经常看到第一个和第二个的组合:

  • 一个或多个线程在一个中处理网络 I/O io_service
  • 长时间运行或阻塞的任务被发布到不同的io_service. 这io_service用作一个线程池,不会干扰处理网络 I/O 的线程。或者,每次需要长时间运行或阻塞的任务时,都可以生成一个分离的线程;但是,线程创建/销毁的开销可能会产生显着影响。

这个答案提供了一个线程池实现。此外,这是一个试图强调两者之间交互的基本示例io_services

#include <iostream>

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/chrono.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>

/// @brief Background service will function as a thread-pool where
///        long-standing blocking operations may occur without affecting
///        the network event loop.
boost::asio::io_service background_service;

/// @brief The main io_service will handle network operations.
boost::asio::io_service io_service;

boost::optional<boost::asio::io_service::work> work;

/// @brief ODBC blocking operation.
///
/// @brief data Data to use for query.
/// @brief handler Handler to invoke upon completion of operation.
template <typename Handler>
void query_odbc(unsigned int data,
                Handler handler)
{
  std::cout << "in background service, start querying odbc\n";
  std::cout.flush();
  // Mimic busy work.
  boost::this_thread::sleep_for(boost::chrono::seconds(5));

  std::cout << "in background service, posting odbc result to main service\n";
  std::cout.flush();
  io_service.post(boost::bind(handler, data * 2));
}

/// @brief Functions as a continuation for handle_read, that will be
///        invoked with results from ODBC.
void handle_read_odbc(unsigned int result)
{
  std::stringstream stream;
  stream << "in main service, got " << result << " from odbc.\n";
  std::cout << stream.str();
  std::cout.flush();

  // Allow io_service to stop in this example.
  work = boost::none;
}

/// @brief Mocked up read handler that will post work into a background
///        service.
void handle_read(const boost::system::error_code& error,
                 std::size_t bytes_transferred)
{
  std::cout << "in main service, need to query odbc" << std::endl;
  typedef void (*handler_type)(unsigned int);
  background_service.post(boost::bind(&query_odbc<handler_type>,
    21,                // data
    &handle_read_odbc) // handler
  );

  // Keep io_service event loop running in this example.
  work = boost::in_place(boost::ref(io_service));
} 

/// @brief Loop to show concurrency.
void print_loop(unsigned int iteration)
{
  if (!iteration) return;

  std::cout << "  in main service, doing work.\n";
  std::cout.flush();
  boost::this_thread::sleep_for(boost::chrono::seconds(1));
  io_service.post(boost::bind(&print_loop, --iteration));  
}

int main()
{
  boost::optional<boost::asio::io_service::work> background_work(
      boost::in_place(boost::ref(background_service)));

  // Dedicate 3 threads to performing long-standing blocking operations.
  boost::thread_group background_threads;
  for (std::size_t i = 0; i < 3; ++i)
    background_threads.create_thread(
      boost::bind(&boost::asio::io_service::run, &background_service));

  // Post a mocked up 'handle read' handler into the main io_service.
  io_service.post(boost::bind(&handle_read,
    make_error_code(boost::system::errc::success), 0));

  // Post a mockup loop into the io_service to show concurrency.
  io_service.post(boost::bind(&print_loop, 5));  

  // Run the main io_service.
  io_service.run();

  // Cleanup background.
  background_work = boost::none;
  background_threads.join_all();
}

和输出:

在主服务中,需要查询 odbc
  在主要服务,做工作。
在后台服务中,开始查询 odbc
  在主要服务,做工作。
  在主要服务,做工作。
  在主要服务,做工作。
  在主要服务,做工作。
在后台服务中,将 odbc 结果发布到主服务
在主要服务中,从 odbc 获得 42。

请注意,处理主帖子的单线程io_service进入background_service,然后在background_service阻塞时继续处理其事件循环。一旦background_service得到结果,它就会将一个处理程序发布到 mainio_service中。

于 2013-07-15T13:32:56.120 回答
1

我们的服务器中有相同的长期运行任务(带有存储的旧协议)。所以我们的服务器正在运行 200 个线程以避免阻塞服务(是的,200 个线程正在运行io_service::run)。它不是太好的东西,但现在效果很好。

我们遇到的唯一问题是asio::strand它使用了所谓的“实现”,它在当前调用 hadler 时被锁定。解决了这个问题,通过增加这个股线桶和通过io_service::post没有股线包裹的“分离”任务。

有些任务可能会运行几秒钟甚至几分钟,这目前确实没有问题。

于 2013-07-15T18:53:06.983 回答