0

我最近一直在玩 boost asio 和一些新的 c++11 结构。这是导致意外行为的示例代码部分(至少对我而言)。

void Server::startAccept()
{
    connections_.push_back(std::make_shared<Connection>(io_service_));
    acceptor_.async_accept(connections_.back()->socket(), std::bind(&Server::accept_handler, this, connections_.back(), std::placeholders::_1));
}

void Server::accept_handler(std::shared_ptr<Connection> con, const boost::system::error_code& ec)
{
    startAccept();

    if (!ec) {
        con->start();
    }
}

在调用 Server::startAccept 之前,我创建了一个 io_service::work 实例和一个名为 io_service_.run() 的 std::thread 池。在我调用 startAccept 之后,主线程只是等待命令行输入。

我希望我的线程池中的一个线程在连接启动时运行 Server::accept_handler。这不会发生。相反,我必须从主线程调用 io_service_.run() 。

现在我玩了一会儿,发现我可以通过这样做来实现所需的行为:

void Server::startAccept()
{
    connections_.push_back(std::make_shared<Connection>(io_service_));
    io_service_.post([this]() { acceptor_.async_accept(connections_.back()->socket(), std::bind(&Server::accept_handler, this, connections_.back(), std::placeholders::_1)); });
}

void Server::accept_handler(std::shared_ptr<Connection> con, const boost::system::error_code& ec)
{
    startAccept();

    if (!ec) {
        con->start();
    }
}

.async_* 操作和 io_service.post 有什么区别?

编辑:定义 BOOST_ASIO_ENABLE_HANDLER_TRACKING

当我编译并运行我的程序,然后使用我包含的第一个代码块连接到服务器时,这是输出:

@asio|1350656555.431359|0*1|socket@00A2F710.async_accept

当我运行包含的第二个代码块并连接到服务器时,我得到以下输出:

@asio|1350656734.789896|0*1|io_service@00ECEC78.post
@asio|1350656734.789896|>1|
@asio|1350656734.789896|1*2|socket@00D0FDE0.async_accept
@asio|1350656734.789896|<1|
@asio|1350656756.150051|>2|ec=system:0
@asio|1350656756.150051|2*3|io_service@00ECEC78.post
@asio|1350656756.150051|>3|
@asio|1350656756.150051|2*4|socket@00EE9090.async_send
@asio|1350656756.150051|3*5|socket@00D0FDE0.async_accept
@asio|1350656756.150051|2*6|socket@00EE9090.async_receive
@asio|1350656756.150051|<3|
@asio|1350656756.150051|>4|ec=system:0,bytes_transferred=54
@asio|1350656756.150051|<2|
@asio|1350656756.150051|<4|
@asio|1350656758.790803|>6|ec=system:10054,bytes_transferred=0
@asio|1350656758.790803|<6|

编辑 2:线程创建洞察力

for (int i = 0; i < NUM_WORKERS; i++) {
    thread_pool.push_back(std::shared_ptr<std::thread>(new std::thread([this]() { io_service_.run(); })));
}
4

2 回答 2

0

io_service.run函数是实际的事件循环,它的基本作用是io_service.post在循环中调用。

编辑:io_service.post文档中:

请求 io_service 调用给定的处理程序并立即返回。

io_service保证处理程序只会在当前调用成员函数的线程中被run()调用。run_one()poll() or poll_one()

应该做的是实现你自己的事件循环,调用io_service.run_one,或者调用io_service.run让Boost处理事件循环。从哪个线程运行事件循环并不重要,所有事件处理程序都将从您运行事件循环的线程调用。

于 2012-10-19T05:30:01.777 回答
0

如果您没有忘记为池中的每个线程调用 io_service::run 并且您使用 io_service::work 来避免退出 io_service::run 循环,那么您在第一种情况下的代码是绝对正确的。

这是工作示例(我忽略了连接处理和程序的正确关闭)

class Connection
{
public:
    Connection(boost::asio::io_service & io_serivce) : socket_(io_serivce) {}
    boost::asio::ip::tcp::socket & socket() { return socket_; }
private:
    boost::asio::ip::tcp::socket socket_;
};

class Server
{
public:
    Server(boost::asio::io_service & io_serivce, const std::string & addr, const std::string & port) : io_service_(io_serivce), acceptor_(io_service_) {
        boost::asio::ip::tcp::resolver resolver(io_service_);
        boost::asio::ip::tcp::resolver::query query(addr, port);
        boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
        acceptor_.open(endpoint.protocol());
        acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
        acceptor_.bind(endpoint);
        acceptor_.listen();
        startAccept();
    }

    void startAccept(){
        connections_.push_back(boost::make_shared<Connection>(  boost::ref(io_service_) ));    
        acceptor_.async_accept(connections_.back()->socket(), boost::bind(&Server::accept_handler, this, connections_.back(), _1) );
    }

    void Server::accept_handler(boost::shared_ptr<Connection> con, const boost::system::error_code& ec)
    {
        std::cout << "start connection" << std::endl;
        startAccept();
    }

private:
    boost::asio::io_service & io_service_;
    boost::asio::ip::tcp::acceptor acceptor_; 
    std::vector< boost::shared_ptr<Connection> > connections_;
};

int main(int argc, char * argv[])
{
    // thread pool
    boost::thread_group threads_;   
    boost::asio::io_service io_service_;
    boost::asio::io_service::work work_(io_service_);

    const size_t kThreadsCount = 3;
    for (std::size_t i = 0; i < kThreadsCount; ++i) {
        threads_.create_thread(boost::bind(&boost::asio::io_service::run, &io_service_));
    }  

    Server s(io_service_, "127.0.0.1", "8089");

    char ch;
    std::cin >> ch;
    return 0;
}
于 2012-10-19T08:27:23.810 回答