2

我正在尝试使用 boost::asio 创建一个异步 UDP 客户端,服务器来自 asio示例

当我创建 3 个客户端(c1、c2、c3)并向服务器发送三个不同的消息时,

但是在服务器端收到的消息有问题:(

UDP客户端:

#include <cstdlib>
#include <iostream>
#include <deque>
#include <boost/bind.hpp>
#include "asio.hpp"
#include "message.hpp"

std::string show_hex(const char* data, size_t size)
{
    std::string hex;
    char buf[16];
    const char* p = data;
    for (size_t i = 0; i < size; ++i) {
        std::sprintf(buf, "%02X ", static_cast<unsigned>(p[i]) & 0xFF);
        hex += buf;
    }
    return hex;
}
std::string show_str(const char* data, size_t size)
{
    std::string str;
    char buf[16];
    const char* p = data;
    for (size_t i = 0; i < size; ++i) {
        std::sprintf(buf, "%c", p[i]);
        str += buf;
    }
    return str;
}
using asio::ip::udp;
class udp_client{ 
public: 
    udp_client (asio::io_service& io_service, const std::string& host, const std::string& port) 
    :io_service_(io_service), socket_(io_service, udp::endpoint (udp::v4(), 0)) { 
        udp::resolver resolver(io_service_); 
        udp::resolver::query query(udp::v4(), host, port); 
        udp::resolver::iterator itr = resolver.resolve(query);

        sender_endpoint_ = *itr; 
    }

    ~udp_client(){ 
        socket_.close();
    }
    void send_message(const message& msg){
        io_service_.post(boost::bind(&udp_client::do_write, this, msg));
    }

    void do_write(const message& msg){
        send_msg_queue_.push_back(msg);

        if (!send_msg_queue_.empty()){
            socket_.async_send_to(
                asio::buffer(send_msg_queue_.front().data(), send_msg_queue_.front().length()),
                sender_endpoint_,
                boost::bind(&udp_client::handle_send_to, this, 
                asio::placeholders::error,
                asio::placeholders::bytes_transferred));

            std::cout<<"send data(str):"<<show_str(send_msg_queue_.front().data(), send_msg_queue_.front().length())<<std::endl;
        }
    }



    void handle_send_to(const asio::error_code& error, size_t s/*bytes_sent*/){
        std:: cout <<"bytes_sent = "<<s<<"." <<std:: endl; 

        if (!error){
            // send success, remove from the queue
            send_msg_queue_.pop_front(); 

            // recv response after send a message
            recv_message();

            if (!send_msg_queue_.empty()){
                socket_.async_send_to(
                    asio::buffer(send_msg_queue_.front().data(), send_msg_queue_.front().length()),
                    sender_endpoint_,
                    boost::bind(&udp_client::handle_send_to, this, 
                    asio::placeholders::error,
                    asio::placeholders::bytes_transferred));
            }
        }
        else{
            std::cerr<<"error in handle_send_to:"<<error<<std::endl;
            socket_.close();
        }

    }

    void recv_message(){
        socket_.async_receive_from(
            asio::buffer(data_.data(), data_.max_length()), sender_endpoint_,
            boost::bind(&udp_client::handle_receive_from, this,
            asio::placeholders::error,
            asio::placeholders::bytes_transferred));
    }

    void handle_receive_from(const asio::error_code& error, size_t bytes_recvd)
    {
        if(!error){
            std::cout<<"handle_receive_from "<<std::endl;
            std::cout<<"recv data(str):"<<show_str(data_.data(), bytes_recvd)<<std::endl;
            if(bytes_recvd > message::header_length)
                data_.body_length(bytes_recvd-message::header_length);
            //std::cout<<"message.lenght()="<<data_.length()<<"bytes_recvd="<<bytes_recvd<<std::endl;
        }
        else{
            std::cerr<<"error in handle_receive_from:"<<error<<std::endl;
            socket_.close();
        }
    }
private: 
    //enum {max_length = 1024};
    //char data_[max_length];
    message data_;
    asio::io_service& io_service_; 
    udp::socket socket_; 
    udp::endpoint sender_endpoint_;
    std::deque<message> send_msg_queue_;
};

int main (int argc, char* argv []) { 

    asio::io_service is; 

    udp_client c1(is, "localhost", "8886");
    udp_client c2(is, "localhost", "8887");
    udp_client c3(is, "localhost", "8888");
    std::string msg1 = "This is Request seq1 from c1.";
    std::string msg2 = "This is Request seq1 from c2.";
    std::string msg3 = "This is Request seq1 from c3.";

    message msg;
    msg.body_length(msg1.length());
    memcpy(msg.data(), "1234567890123456789021", message::header_length);
    memcpy(msg.body(), "This is Request seq1 from c1.", msg.body_length());

    c1.send_message(msg);

    msg.body_length(msg2.length());
    memcpy(msg.data(), "1234567890123456789022", message::header_length);
    memcpy(msg.body(), "This is Request seq1 from c2.", msg.body_length());
    c2.send_message(msg);

    msg.body_length(msg3.length());
    memcpy(msg.data(), "1234567890123456789023", message::header_length);
    memcpy(msg.body(), "This is Request seq1 from c3.", msg.body_length());
    c3.send_message(msg);

    is.run (); 

    return 0; 
}

udp 服务器代码:

// async_udp_echo_server.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2011 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include "asio.hpp"
std::string show_hex(const char* data, size_t size)
{
    std::string hex;
    char buf[16];
    const char* p = data;
    for (size_t i = 0; i < size; ++i) {
        std::sprintf(buf, "%02X ", static_cast<unsigned>(p[i]) & 0xFF);
        hex += buf;
    }
    return hex;
}
std::string show_str(const char* data, size_t size)
{
    std::string str;
    char buf[16];
    const char* p = data;
    for (size_t i = 0; i < size; ++i) {
        std::sprintf(buf, "%c", p[i]);
        str += buf;
    }
    return str;
}
using asio::ip::udp;

class udp_server
{
public:
  udp_server(asio::io_service& io_service, short port)
    : io_service_(io_service),
      socket_(io_service, udp::endpoint(udp::v4(), port))
  {
    memset(data_, 0, max_length);

    socket_.async_receive_from(
        asio::buffer(data_, max_length), sender_endpoint_, 
        boost::bind(&udp_server::handle_receive_from, this, 
          asio::placeholders::error,
          asio::placeholders::bytes_transferred));
  }

  void handle_receive_from(const asio::error_code& error,
      size_t bytes_recvd)
  {
    if (!error && bytes_recvd > 0)
    {
        std::cout<<"recv data(size="<<bytes_recvd<<"):"
            <<show_str(data_, bytes_recvd)<<endl
            <<"["<<show_hex(data_, bytes_recvd)<<"]"<<std::endl;

        socket_.async_send_to(
            asio::buffer(data_, bytes_recvd), sender_endpoint_, 
            boost::bind(&udp_server::handle_send_to, this,
            asio::placeholders::error,
            asio::placeholders::bytes_transferred));
    }
    else
    {
      socket_.async_receive_from(
          asio::buffer(data_, max_length), sender_endpoint_,
          boost::bind(&udp_server::handle_receive_from, this,
            asio::placeholders::error,
            asio::placeholders::bytes_transferred));
    }
  }

  void handle_send_to(const asio::error_code& /*error*/,
      size_t s/*bytes_sent*/)
  {
    socket_.async_receive_from(
        asio::buffer(data_, max_length), sender_endpoint_,
        boost::bind(&udp_server::handle_receive_from, this,
        asio::placeholders::error,
        asio::placeholders::bytes_transferred));
    memset(data_, 0, max_length);
  }

private:
  asio::io_service& io_service_;
  udp::socket socket_;
  udp::endpoint sender_endpoint_;
  enum { max_length = 1024 };
  char data_[max_length];
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: async_udp_echo_server <port>\n";
      return 1;
    }

    asio::io_service io_service;

    using namespace std; // For atoi.
    udp_server s(io_service, atoi(argv[1]));

    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

消息类:

// message.hpp
// ~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2011 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef MESSAGE_HPP
#define MESSAGE_HPP

#include <cstdio>
#include <cstdlib>
#include <cstring>

class message
{
public:
  enum { header_length = 22 };
  enum { max_body_length = 1024 };

  message()
    : body_length_(0)
  {
  }

  const char* data() const
  {
    return data_;
  }

  char* data()
  {
    return data_;
  }
  size_t max_length()
  {
    return header_length + max_body_length;
  }
  size_t length() const
  {
    return header_length + body_length_;
  }

  const char* body() const
  {
    return data_ + header_length;
  }

  char* body()
  {
    return data_ + header_length;
  }

  size_t body_length() const
  {
    return body_length_;
  }

  void body_length(size_t length)
  {
    body_length_ = length;
    if (body_length_ > max_body_length)
      body_length_ = max_body_length;
  }

  bool decode_header()// 头 4个字节存放的是body长度
  {
    using namespace std; // For strncat and atoi.
    char header[header_length + 1] = "";
    strncat(header, data_, header_length);
    body_length_ = atoi(header);
    if (body_length_ > max_body_length)
    {
      body_length_ = 0;
      return false;
    }
    return true;
  }

  void encode_header()
  {
    using namespace std; // For sprintf and memcpy.
    char header[header_length + 1] = "";
    sprintf(header, "%4d", body_length_);
    memcpy(data_, header, header_length);
  }

private:
  char data_[header_length + max_body_length];
  size_t body_length_;
};
4

1 回答 1

2

您的客户端代码的do_write方法不正确const_buffer缓冲区不保存数据,它是将指针和长度保存在一起的包装器。该c_str方法在堆栈上分配数据,当函数返回时,指针变为无效。正确的方法是传递数据指针,或者更正重载

asio::buffer(msg.data(), msg.length()); 

但是在调用 &udp_client::handle_send_to 之前,您不得在 msg 上调用任何非成本方法。如果没有,您应该将字符串复制到data_缓冲区。

如果这不是最后一个错误,请更新您的问题。

于 2012-05-14T13:43:55.397 回答