介绍:
我正在尝试使用 c++ 和 boost::asio 制作服务器/客户端应用程序,我还尝试通过连续异步读取在另一端接收同步写入。例如,我的服务器异步读取固定字节长度的数据流,然后它移动到客户端同步写入发送的下一个字节。
问题:
在连续异步读取中,第一个流字节被正确读取,但是当它继续下一次异步读取时,从另一端发送的东西与我期望的相同字节长度,但它是垃圾,或者我无法将其转换为有价值的数据。
编码:
private:
static const int MAX_MTU = 1500; //Ethernet Maximum Transfer UNIT (MTU)...
static const int TRASH_CAN = 100; //Amount to substract from the MAX_MTU to make room for basic packet structure elements.
static const int BUFFER_SIZE = MAX_MTU - TRASH_CAN;
boost::mutex mutex;
typedef char* data_bytes;
std::list<data_bytes> data;
private:
tcp::socket socket_;
boost::asio::io_service& io_service_;
moqane::tcp_packet *packet_;
// CTOR for the incoming connection from a client to the server
public: tcp_session(boost::asio::io_service& io_service)
: io_service_(io_service),
socket_(io_service),
packet_(new moqane::tcp_packet())
{
// ......
}
// CTOR for the outgoing clinet connection to the server...
public: tcp_session(boost::asio::io_service& io_service, tcp::resolver::iterator endpoint_iterator)
: io_service_(io_service),
socket_(io_service),
packet_(new moqane::tcp_packet())
{
boost::asio::async_connect(socket_,
endpoint_iterator,
boost::bind(&tcp_session::connect_to_server_hndlr,
this,
boost::asio::placeholders::error)
);
}
tcp::socket& socket()
{
return socket_;
}
public:
void read_header()
{
read_packet(packet_->HEADER_LENGTH,
boost::bind(
&moqane::tcp_session::read_header_hndlr,
shared_from_this(),
packet_->HEADER_LENGTH,
boost::asio::placeholders::error)
);
}
private:
void connect_to_server_hndlr(const boost::system::error_code& error)
{
if (!error)
{
read_header();
}
else
{
// TODO: fire the error event...
}
}
private:
template <class T>
void read_packet(int packet_length, T handler)
{
/*
Packet Structure: (Mohamed Tarek (moqane))
==============================================================================================
HEADER: could be a 3 or 4 bytes that give enough information to the endpoint on how to receive
those bytes.
SIZE : A 4 byte-length number that tell the endpoint how much data in bytes ar comming.
ex: 0340 which should tell us that the incoming data are 340 bytes, so we will not
receive more or less than that.
DATA : the incoming valuable information that we want to receive in the first place.
---------------------------------------------
| | | |
| HEADER | SIZE | DATA... | ----> PACKET
| 3b | 4b | N/A b |
----------------------------------------------
==============================================================================================
*/
if (data.size() > 0)
{
data.clear();
}
char d[moqane::tcp_session::BUFFER_SIZE];
data.push_back(d);
boost::asio::async_read(socket_,
boost::asio::buffer(data.back(), packet_length),
handler
);
}
private:
void write_packet(char* data_bytes)
{
boost::asio::write(socket_, boost::asio::buffer(data_bytes, sizeof(data_bytes)));
}
private:
void write_packet_hndlr(const boost::system::error_code& error)
{
if (!error)
{
}
else
{
}
}
private:
void read_header_hndlr(int packet_length, const boost::system::error_code& error)
{
if (!error)
{
// convert bytes to wxstring
// wxString s = moqane::wx2string::To_wxString(packet_->DATA(), packet_->HEADER_LENGTH());
// convert our bytes to string
std::string header(data.back(), packet_length);
if (packet_->is_header(header))
{
// read the SIZE packet
read_packet(packet_->SIZE_LENGTH,
boost::bind(
&moqane::tcp_session::read_size_hndlr,
shared_from_this(),
packet_->SIZE_LENGTH,
header,
"",
boost::asio::placeholders::error)
);
}
else
{
// reread the HEADER packet if it's not a valid header
read_header();
}
}
else
{
// TODO: fire the error event...
}
}
private:
void read_size_hndlr(int packet_length, std::string header, std::string info, const boost::system::error_code& error)
{
if (!error)
{
std::string str_length(data.back(), packet_length);
int next_packet_length = moqane::number2string::ToInt(str_length);
if (next_packet_length > 0)
{
if (header == packet_->HEADER_STRING)
{
read_packet(next_packet_length,
boost::bind(
&moqane::tcp_session::read_STRING_hndlr,
shared_from_this(),
next_packet_length,
boost::asio::placeholders::error)
);
}
else if (header == packet_->HEADER_COMMAND)
{
}
else
{
// reread the HEADER packet if it's not a valid header
read_header();
}
}
else
{
// reread the HEADER packet if it's not a valid size
read_header();
}
}
else
{
// TODO: fire the error event...
}
}
private:
void read_STRING_hndlr(int packet_length, const boost::system::error_code& error)
{
std::string std_str(data.back(), packet_length);
std::string v = "";
}
public:
void write_STRING(char* string_data)
{
boost::mutex::scoped_lock lock(mutex);
{
write_packet(moqane::number2string::To_CharArray("STR"));
write_packet(moqane::number2string::To_CharArray("xxx1"));
write_packet(moqane::number2string::To_CharArray("a"));
}
}
};