有太多未知数无法从发布的代码中确定延迟的根本原因。然而,有一些方法和考虑可以帮助确定问题:
- 为 Boost.Asio 1.47+启用处理程序跟踪。只需定义
BOOST_ASIO_ENABLE_HANDLER_TRACKING
,Boost.Asio 就会将调试输出(包括时间戳)写入标准错误流。这些时间戳可用于帮助过滤掉由应用程序代码( 、 等)引入的parseHeader()
延迟parsePacket()
。
- 验证是否正确处理了字节排序。例如,如果协议将标头的
size
字段定义为网络字节顺序中的两个字节,并且服务器将该字段作为原始短格式处理,则在接收到正文大小为 的消息时10
:
- 大端机器将调用
async_read
读取10
字节。读取操作应该很快完成,因为套接字已经有10
可供读取的字节体。
- little-endian 机器将调用
async_read
读取2560
字节。读取操作可能仍然未完成,因为尝试读取的字节数比预期的要多得多。
- 使用跟踪工具,例如strace、ltrace等。
- 修改 Boost.Asio,在整个调用堆栈中添加时间戳。Boost.Asio 仅作为头文件库提供。因此,用户可以对其进行修改以提供所需的详细程度。虽然不是最干净或最简单的方法,但在整个调用堆栈中添加带有时间戳的打印语句可能有助于提供对时间的可见性。
- 尝试在一个简短、简单、自包含的示例中复制该行为。从最简单的示例开始,以确定延迟是否是系统性的。然后,对示例进行迭代扩展,使其每次迭代都更接近真实代码。
这是我开始的一个简单示例:
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
class tcp_server
: public boost::enable_shared_from_this< tcp_server >
{
private:
enum
{
header_size = 4,
data_size = 10,
buffer_size = 1024,
max_stamp = 50
};
typedef boost::asio::ip::tcp tcp;
public:
typedef boost::array< boost::posix_time::ptime, max_stamp > time_stamps;
public:
tcp_server( boost::asio::io_service& service,
unsigned short port )
: strand_( service ),
acceptor_( service, tcp::endpoint( tcp::v4(), port ) ),
socket_( service ),
index_( 0 )
{}
/// @brief Returns collection of timestamps.
time_stamps& stamps()
{
return stamps_;
}
/// @brief Start the server.
void start()
{
acceptor_.async_accept(
socket_,
boost::bind( &tcp_server::handle_accept, this,
boost::asio::placeholders::error ) );
}
private:
/// @brief Accept connection.
void handle_accept( const boost::system::error_code& error )
{
if ( error )
{
std::cout << error.message() << std::endl;
return;
}
read_header();
}
/// @brief Read header.
void read_header()
{
boost::asio::async_read(
socket_,
boost::asio::buffer( buffer_, header_size ),
boost::bind( &tcp_server::handle_read_header, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
}
/// @brief Handle reading header.
void
handle_read_header( const boost::system::error_code& error,
std::size_t bytes_transferred )
{
if ( error )
{
std::cout << error.message() << std::endl;
return;
}
// If no more stamps can be recorded, then stop the async-chain so
// that io_service::run can return.
if ( !record_stamp() ) return;
// Read data.
boost::asio::async_read(
socket_,
boost::asio::buffer( buffer_, data_size ),
boost::bind( &tcp_server::handle_read_data, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
}
/// @brief Handle reading data.
void handle_read_data( const boost::system::error_code& error,
std::size_t bytes_transferred )
{
if ( error )
{
std::cout << error.message() << std::endl;
return;
}
// If no more stamps can be recorded, then stop the async-chain so
// that io_service::run can return.
if ( !record_stamp() ) return;
// Start reading header again.
read_header();
}
/// @brief Record time stamp.
bool record_stamp()
{
stamps_[ index_++ ] = boost::posix_time::microsec_clock::local_time();
return index_ < max_stamp;
}
private:
boost::asio::io_service::strand strand_;
tcp::acceptor acceptor_;
tcp::socket socket_;
boost::array< char, buffer_size > buffer_;
time_stamps stamps_;
unsigned int index_;
};
int main()
{
boost::asio::io_service service;
// Create and start the server.
boost::shared_ptr< tcp_server > server =
boost::make_shared< tcp_server >( boost::ref(service ), 33333 );
server->start();
// Run. This will exit once enough time stamps have been sampled.
service.run();
// Iterate through the stamps.
tcp_server::time_stamps& stamps = server->stamps();
typedef tcp_server::time_stamps::iterator stamp_iterator;
using boost::posix_time::time_duration;
for ( stamp_iterator iterator = stamps.begin() + 1,
end = stamps.end();
iterator != end;
++iterator )
{
// Obtain the delta between the current stamp and the previous.
time_duration delta = *iterator - *(iterator - 1);
std::cout << "Delta: " << delta.total_milliseconds() << " ms"
<< std::endl;
}
// Calculate the total delta.
time_duration delta = *stamps.rbegin() - *stamps.begin();
std::cout << "Total"
<< "\n Start: " << *stamps.begin()
<< "\n End: " << *stamps.rbegin()
<< "\n Delta: " << delta.total_milliseconds() << " ms"
<< std::endl;
}
关于实现的几点说明:
- 只有一个线程(主)和一个异步链read_header->handle_read_header->handle_read_data。这应该最大限度地减少准备运行的处理程序花费在等待可用线程上的时间。
- 专注于
boost::asio::async_read
,噪声通过以下方式最小化:
- 使用预先分配的缓冲区。
- 不使用
shared_from_this()
or strand::wrap
。
- 记录时间戳,并在收集后执行处理。
我使用 gcc 4.4.0 和 Boost 1.50 在 CentOS 5.4 上编译。为了驱动数据,我选择使用netcat发送 1000 个字节:
$ ./a.out > 输出 &
[1] 18623
$ echo "$(for i in {0..1000}; do echo -n "0"; done)" | 数控 127.0.0.1 33333
[1]+ 完成 ./a.out >输出
$尾输出
增量:0 毫秒
增量:0 毫秒
增量:0 毫秒
增量:0 毫秒
增量:0 毫秒
增量:0 毫秒
全部的
开始时间:2012-9-10 21:22:45.585780
结束:2012-9-10 21:22:45.586716
增量:0 毫秒
boost::asio::async_read
没有发现任何延迟,我通过修改调用、替换this
和shared_from_this()
包装ReadHandlers
s来扩展示例strand_.wrap()
。我运行了更新的示例,但仍然没有观察到延迟。不幸的是,这是我根据问题中发布的代码所能得到的。
考虑扩展该示例,在每次迭代中添加实际实现中的一部分。例如:
- 首先使用
msg
变量的类型来控制缓冲区。
- 接下来,发送有效数据,并介绍
parseHeader()
和parsePacket
功能。
- 最后介绍一下
lib::GET_SERVER_TIME()
印刷品。
如果示例代码尽可能接近真实代码,并且没有观察到延迟boost::asio::async_read
,则ReadHandler
s 可能已准备好在真实代码中运行,但它们正在等待同步(链)或资源(一个线程),导致延迟:
- 如果延迟是与链同步的结果,那么考虑Robin的建议,通过读取更大的数据块来潜在地减少每条消息所需的读取量。
- 如果延迟是等待线程的结果,则考虑进行额外的线程调用
io_service::run()
。