2

我一直在阅读一些 Boost ASIO 教程。到目前为止,我的理解是整个发送和接收都是一个循环,只能迭代一次。请看下面的简单代码:

客户端.cpp:

#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <iostream>
#include <string>

boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::socket sock(io_service);
boost::array<char, 4096> buffer;

void read_handler(const boost::system::error_code &ec, std::size_t bytes_transferred)
{
  if (!ec)
  {
    std::cout << std::string(buffer.data(), bytes_transferred) << std::endl;
    sock.async_read_some(boost::asio::buffer(buffer), read_handler);
  }
}

void connect_handler(const boost::system::error_code &ec)
{
  if (!ec)
  {        
    sock.async_read_some(boost::asio::buffer(buffer), read_handler);
  }
}

void resolve_handler(const boost::system::error_code &ec, boost::asio::ip::tcp::resolver::iterator it)
{
  if (!ec)
  {
    sock.async_connect(*it, connect_handler);
  }
}

int main()
{
  boost::asio::ip::tcp::resolver::query query("localhost", "2013");
  resolver.async_resolve(query, resolve_handler);
  io_service.run();
}

程序resolves一个地址,connects到服务器和reads数据,最后ends在没有数据的时候。我的问题:我怎样才能继续这个循环?我的意思是,如何在我的应用程序的整个生命周期中保持客户端和服务器之间的这种连接,以便服务器在它东西要发送时发送数据?我试图打破这个圈子,但所有的接缝都被困在里面io_service.run() 同样的问题也适用于我的服务器:

服务器.cpp:

#include <boost/asio.hpp>
#include <string>

boost::asio::io_service io_service;
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), 2013);
boost::asio::ip::tcp::acceptor acceptor(io_service, endpoint);
boost::asio::ip::tcp::socket sock(io_service);
std::string data = "Hello, world!";

void write_handler(const boost::system::error_code &ec, std::size_t bytes_transferred)
{
}

void accept_handler(const boost::system::error_code &ec)
{
  if (!ec)
  {
    boost::asio::async_write(sock, boost::asio::buffer(data), write_handler);
  }
}

int main()
{
  acceptor.listen();
  acceptor.async_accept(sock, accept_handler);
  io_service.run();
}

这只是一个例子。在实际应用程序中,我可能希望保持套接字打开并将其重用于其他数据交换(读取和写入)。我该怎么做。

我重视你的善意评论。如果您参考了一些解决此问题的简单解决方案,如果您提及它,我将不胜感激。谢谢

更新(服务器示例代码)

根据下面给出的答案(更新 2),我编写了服务器代码。请注意,代码已简化(尽管可以编译和运行)。另请注意,io_service 永远不会返回,因为它总是在等待新的连接。这就是io_service.run永不回归和永远运行的方式。每当您希望 io_service.run 返回时,只需让接受者不再接受即可。请以我目前不记得的许多方式之一来做这件事。(说真的,我们如何以干净的方式做到这一点?:))

请享用:

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <string>
#include <iostream>
#include <vector>
#include <time.h>
boost::asio::io_service io_service;
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), 2013);
boost::asio::ip::tcp::acceptor acceptor(io_service, endpoint);
//boost::asio::ip::tcp::socket sock(io_service);
std::string data = "Hello, world!";

class Observer;
std::vector<Observer*> observers;

class Observer
{
public:
    Observer(boost::asio::ip::tcp::socket *socket_):socket_obs(socket_){}
    void notify(std::string data)
    {
        std::cout << "notify called data[" << data << "]" << std::endl;
        boost::asio::async_write(*socket_obs, boost::asio::buffer(data) , boost::bind(&Observer::write_handler, this,boost::asio::placeholders::error));
    }
    void write_handler(const boost::system::error_code &ec)
    {
         if (!ec) //no error: done, just wait for the next notification
             return;
         socket_obs->close(); //client will get error and exit its read_handler
         observers.erase(std::find(observers.begin(), observers.end(),this));
         std::cout << "Observer::write_handler  returns as nothing was written" << std::endl;
    }
private:
        boost::asio::ip::tcp::socket *socket_obs;
};


class server
{
public:
     void CreatSocketAndAccept()
     {
          socket_ = new boost::asio::ip::tcp::socket(io_service);
          observers.push_back(new Observer(socket_));
          acceptor.async_accept(*socket_,boost::bind(&server::handle_accept, this,boost::asio::placeholders::error));
     }
      server(boost::asio::io_service& io_service)
      {
          acceptor.listen();
          CreatSocketAndAccept();
      }

      void handle_accept(const boost::system::error_code& e)
      {
          CreatSocketAndAccept();
      }
private:
  boost::asio::ip::tcp::socket *socket_;
};

class Agent
{
public:
    void update(std::string data)
    {
        if(!observers.empty())
        {
//          std::cout << "calling notify data[" << data << "]" << std::endl;
            observers[0]->notify(data);
        }
    }

};
Agent agent;
void AgentSim()
{
    int i = 0;
    sleep(10);//wait for me to start client
    while(i++ < 10)
    {
        std::ostringstream out("");
        out << data << i ;
//      std::cout << "calling update data[" << out.str() << "]" << std::endl;
        agent.update(out.str());
        sleep(1);
    }
}
void run()
{
    io_service.run();
    std::cout << "io_service returned" << std::endl;
}
int main()
{
    server server_(io_service);

    boost::thread thread_1(AgentSim);
    boost::thread thread_2(run);
    thread_2.join();
    thread_1.join();
}
4

1 回答 1

5

您可以简化基于 asio 的 porgram 的逻辑,如下所示:每个调用 async_X 函数的函数都提供一个处理程序。这有点像状态机的状态之间的转换,其中处理程序是状态,而异步调用是状态之间的转换。只是退出处理程序而不调用 async_* 函数就像转换到结束状态。程序“所做”的一切(发送数据、接收数据、连接套接字等)都发生在转换期间。

如果你这样看,你的客户看起来像这样(只有“好路径”,即没有错误):

<start>         --(resolve)----> resolve_handler
resolve_handler --(connect)----> connect_handler
connect_handler --(read data)--> read_handler
read_handler    --(read data)--> read_handler

你的服务器是这样的:

<start>         --(accept)-----> accept handler
accept_handler  --(write data)-> write_handler
write_handler   ---------------> <end>

由于您write_handler没有做任何事情,它会转换到最终状态,即ioservice::run返回。现在的问题是,在将数据写入套接字之后,您想做什么?根据这一点,您将必须定义一个相应的转换,即执行您想要执行的操作的异步调用。

更新: 从您的评论中,我看到您想等待下一个数据准备好,即下一个刻度。然后过渡如下所示:

write_handler   --(wait for tick/data)--> dataready
dataready       --(write data)----------> write_handler

你看,这引入了一个新的状态(处理程序),我称之为它dataready,你也可以称之为它tick_handler或其他东西。转换回 write_handler 很简单:

void dataready()
{
  // get the new data...
  async_write(sock, buffer(data), write_handler);
}

在某些计时器上,从 的过渡write_handler可以很简单。async_wait如果数据来自“外部”并且您不知道它们何时准备好,请等待一段时间,检查数据是否在那里,如果没有,请再等待一段时间:

write_handler    --(wait some time)--> checkForData
checkForData:no  --(wait some time)--> checkForData
checkForData:yes --(write data)------> write_handler

或者,在(伪)代码中:

void write_handler(const error_code &ec, size_t bytes_transferred)
{
  //...
  async_wait(ticklenght, checkForData);
}

void checkForData(/*insert wait handler signature here*/)
{
  if (dataIsReady())
  {
    async_write(sock, buffer(data), write_handler);
  }
  else
  {
    async_wait(shortTime, checkForData):
  }
}

更新 2: 根据您的评论,您已经有一个以某种方式进行时间管理的代理(调用 update every tick)。以下是我将如何解决这个问题:

  1. 让代理拥有一个观察者列表,当更新调用中有新数据时,这些观察者会得到通知。
  2. 让每个观察者处理一个客户端连接(套接字)。
  3. 让服务器等待传入的连接,从它们创建观察者并将它们注册到代理。

我对 ASIO 的确切语法不是很确定,所以这将是手动伪代码:

服务器:

void Server::accept_handler()
{
    obs = new Observer(socket);
    agent.register(obs);
    new socket; //observer takes care of the old one
    async_accept(..., accept_handler);
}

代理人:

void Agent::update()
{
    if (newDataAvailable())
    {
        for (auto& obs : observers)
        {
             obs->notify(data);
        }
    }
}

观察员:

void Observer::notify(data)
{
    async_write(sock, data, write_handler);
}

void Observer::write_handler(error_code ec, ...)
{
     if (!ec) //no error: done, just wait for the next notification
         return;
     //on error: close the connection and unregister
     agent.unregister(this);
     socket.close(); //client will get error and exit its read_handler
}
于 2013-04-05T08:32:47.803 回答