28

我正在开发一个多线程应用程序,其中一个线程充当从客户端接收命令的 tcp 服务器。线程使用 Boost 套接字和接受器等待客户端连接,从客户端接收命令,将命令传递给应用程序的其余部分,然后再次等待。这是代码:

void ServerThreadFunc()
{
    using boost::asio::ip::tcp;
    boost::asio::io_service io_service;
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port_no));

    for (;;)
    {
        //  listen for command connection
        tcp::socket socket(io_service);
        acceptor.accept(socket);

        //  connected; receive command
        boost::array<char,256> msg_buf;
        socket.receive(boost::asio::buffer(msg_buf));

        //  do something with received bytes here
    }
}

该线程将大部分时间都花在了对 的调用上acceptor.accept()。目前,线程仅在应用程序退出时终止。不幸的是,这会在 main() 返回后导致崩溃——我相信是因为线程在单例被销毁后尝试访问应用程序的日志记录单例。(当我来到这里时就是这样,老实说。)

当应用程序退出时,我怎样才能干净地关闭这个线程?我读过原始套接字上的阻塞 accept() 调用可以通过从另一个线程关闭套接字来中断,但这似乎不适用于 Boost 套接字。我尝试使用Boost asynchronous tcp echo server example将服务器逻辑转换为异步 i/o ,但这似乎只是将阻塞调用交换为acceptor::accept()阻塞调用io_service::run(),所以我遇到了同样的问题:阻塞我不能打断的电话。有任何想法吗?

4

5 回答 5

30

简而言之,有两种选择:

  • 将代码更改为异步(acceptor::async_accept()async_read),在事件循环中通过 运行io_service::run(),并通过取消io_service::stop()
  • 强制阻塞调用以中断较低级别的机制,例如信号。

我会推荐第一个选项,因为它更可能是便携且更易于维护的。要理解的重要概念是,io_service::run()只要有待处理的工作,就会阻塞。当io_service::stop()被调用时,它会尝试让所有被阻塞的线程io_service::run()尽快返回;即使在事件循环中调用了同步操作,它也不会中断同步操作,例如acceptor::accept()and 。socket::receive()重要的是要注意这io_service::stop()是一个非阻塞调用,因此与被阻塞的线程同步io_service::run()必须使用另一种机制,例如thread::join().

这是一个运行 10 秒并监听端口 8080 的示例:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <iostream>

void StartAccept( boost::asio::ip::tcp::acceptor& );

void ServerThreadFunc( boost::asio::io_service& io_service )
{
  using boost::asio::ip::tcp;
  tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), 8080 ) );

  // Add a job to start accepting connections.
  StartAccept( acceptor );

  // Process event loop.
  io_service.run();

  std::cout << "Server thread exiting." << std::endl;
}

void HandleAccept( const boost::system::error_code& error,
                   boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
                   boost::asio::ip::tcp::acceptor& acceptor )
{
  // If there was an error, then do not add any more jobs to the service.
  if ( error )
  {
    std::cout << "Error accepting connection: " << error.message() 
              << std::endl;
    return;
  }

  // Otherwise, the socket is good to use.
  std::cout << "Doing things with socket..." << std::endl;

  // Perform async operations on the socket.

  // Done using the socket, so start accepting another connection.  This
  // will add a job to the service, preventing io_service::run() from
  // returning.
  std::cout << "Done using socket, ready for another connection." 
            << std::endl;
  StartAccept( acceptor );
};

void StartAccept( boost::asio::ip::tcp::acceptor& acceptor )
{
  using boost::asio::ip::tcp;
  boost::shared_ptr< tcp::socket > socket(
                                new tcp::socket( acceptor.get_io_service() ) );

  // Add an accept call to the service.  This will prevent io_service::run()
  // from returning.
  std::cout << "Waiting on connection" << std::endl;
  acceptor.async_accept( *socket,
    boost::bind( HandleAccept,
      boost::asio::placeholders::error,
      socket,
      boost::ref( acceptor ) ) );
}

int main()
{
  using boost::asio::ip::tcp;

  // Create io service.
  boost::asio::io_service io_service;

  // Create server thread that will start accepting connections.
  boost::thread server_thread( ServerThreadFunc, boost::ref( io_service ) );

  // Sleep for 10 seconds, then shutdown the server.
  std::cout << "Stopping service in 10 seconds..." << std::endl;
  boost::this_thread::sleep( boost::posix_time::seconds( 10 ) );
  std::cout << "Stopping service now!" << std::endl;

  // Stopping the io_service is a non-blocking call.  The threads that are
  // blocked on io_service::run() will try to return as soon as possible, but
  // they may still be in the middle of a handler.  Thus, perform a join on 
  // the server thread to guarantee a block occurs.
  io_service.stop();

  std::cout << "Waiting on server thread..." << std::endl;
  server_thread.join();
  std::cout << "Done waiting on server thread." << std::endl;

  return 0;
}

运行时,我打开了两个连接。这是输出:

10 秒后停止服务...
等待连接
用socket做事...
使用套接字完成,准备进行另一个连接。
等待连接
用socket做事...
使用套接字完成,准备进行另一个连接。
等待连接
立即停止服务!
正在等待服务器线程...
服务器线程退出。
在服务器线程上等待完成。
于 2012-06-25T18:35:23.323 回答
4

当您收到需要退出的事件时,您可以调用acceptor.cancel(),这将取消挂起的接受(错误代码为operation_canceled)。在某些系统上,您可能还必须close()使用接受器以确保安全。

于 2012-06-25T15:51:56.373 回答
4

如果涉及到它,您可以在 localhost 上打开一个到它的临时客户端连接 - 这将唤醒它。您甚至可以向它发送一条特殊消息,以便您可以从 pub 关闭您的服务器 - 应该有一个应用程序:)

于 2012-06-25T16:04:27.463 回答
1

只需使用本机句柄和 SHUT_RD 选项调用关闭,即可取消现有的接收(接受)操作。

于 2016-12-30T18:53:10.680 回答
-1

接受的答案并不完全正确。事实上@JohnYu 回答正确

使用 ASIO 的阻塞 API 很像使用 ASIO 库包装在其类中的 BSD 套接字 API。

问题是boost::asio::ip::tcp::acceptor类不提供shutdown()功能,因此您必须使用“旧”套接字 API 来完成。

附加说明:确保acceptor,socketio_service在所有使用它的线程退出之前没有被删除。在下面的代码std::shared_ptr中,用于保持共享资源处于活动状态,因此ApplicationContext类的用户可以删除ApplicationContext对象并避免SEGFAULT 崩溃

附加说明:注意 boost 文档,有引发异常的重载方法和返回错误代码的方法。原始海报的代码在acceptor->accept(socket);没有 try/catch 的情况下使用,这将导致程序退出而不是正常的线程例程退出和清理。

以下是解决方案说明:

#include <unistd.h> // include ::shutdown() function
// other includes ...

using boost::asio::ip::tcp;
using boost::asio::io_service;

class ApplicationContext {

    // Use shared pointer to extend life of resources afer ApplicationContext is deleted
    // and running threads can still keep using shared resources
    std::shared_ptr<tcp::acceptor> acceptor;
    std::shared_ptr<io_service> ioservice;

    // called `ServerThreadFunc` in question code example
    void AcceptLoopThreadRoutine(int port_no) {
        ioservice = std::make_shared<io_service>();
        acceptor = std::make_shared<tcp::acceptor>(*ioservice, tcp::endpoint(tcp::v4(), port_no));

        try {
            for (;;) {
                // listen for client connection
                tcp::socket socket(*ioservice);
                // Note boost::system::system_error is raised when using this overload
                acceptor->accept(socket);

                // connected receive some data ...
                // // boost::array<char,256> msg_buf;
                // // socket.receive(boost::asio::buffer(msg_buf));
                //  do something with received bytes here
            }
        } catch(std::exception const & exception) {
            // boost::system::system_error here indicates clean exit ;)
        }
    }

    void StopAcceptThread() {
        if(acceptor) {
            // boost::asio::ip::tcp::acceptor does not have shutdown() functionality
            // exposed, so we need to do it with this low-level approach
            int shutdown_status = shutdown(acceptor->native_handle(), SHUT_RDWR);
        }
    }

};

另请注意,使用信号解除阻塞接受线程是非常讨厌的实现,本地主机上的临时客户端连接解除阻塞接受线程非常尴尬。

ASIO 可以帮助您在单线程中通过回调完成所有工作。如果您正在混合线程和 ASIO,那么您的设计可能很糟糕。

附加说明:不要混淆shutdown()close()。某些系统可能允许您close()在接受套接字上使用来解除阻塞接受循环,但这不是可移植的。

于 2020-06-13T07:50:00.567 回答