在 Solaris 上使用 boost 发送大小超过 65536 字节的消息时,我们观察到一个奇怪async_write
的问题(尽管在 Linux 上没有观察到该问题)。我们正在使用带有 studio 12 CC 编译器(在 Solaris 上)和 g++ 4.4.3(在 Linux 上)的 boost 版本 1.47。
当发送大小超过 65536 的消息时(有时我们也观察到大约 50K 字节大小的问题,但对于 65536 字节,问题总是可重现的),消息会正确传递给接收者。但是一旦消息发送出去,发送方的 io 服务线程就会进入忙等待循环。发件人 pid 上的“truss”连续显示以下调用。
/6: recvmsg(57, 0xFFFFFD7FEB9FDDF0, 0) Err#11 EAGAIN
/6: write(13, " 9\0\0\019\0\0\0", 8) = 8
/6: ioctl(13, DP_POLL, 0xFFFFFD7FEB9FE460) = 1
/6: recvmsg(57, 0xFFFFFD7FEB9FDDF0, 0) Err#11 EAGAIN
/6: write(13, " 9\0\0\019\0\0\0", 8) = 8
/6: ioctl(13, DP_POLL, 0xFFFFFD7FEB9FE460) = 1
/6: recvmsg(57, 0xFFFFFD7FEB9FDDF0, 0) Err#11 EAGAIN
/6: write(13, " 9\0\0\019\0\0\0", 8) = 8
/6: ioctl(13, DP_POLL, 0xFFFFFD7FEB9FE460) = 1
/6: recvmsg(57, 0xFFFFFD7FEB9FDDF0, 0) Err#11 EAGAIN
/6: write(13, " 9\0\0\019\0\0\0", 8) = 8
/6: ioctl(13, DP_POLL, 0xFFFFFD7FEB9FE460) = 1
/6: recvmsg(57, 0xFFFFFD7FEB9FDDF0, 0) Err#11 EAGAIN
/6: write(13, " 9\0\0\019\0\0\0", 8) = 8
/6: ioctl(13, DP_POLL, 0xFFFFFD7FEB9FE460) = 1
“lsof”表示上面的socket描述符57是连接到receiver的socket描述符。
这里的线程 6 是 ioservice 线程,其堆栈跟踪是 -
thread t@6 -- ????????
thread_proxy
boost::detail::thread_data<>::run
des::tunnel::ServiceScheduler::processServiceWork
boost::asio::io_service::run
boost::asio::detail::task_io_service::run
boost::asio::detail::task_io_service::do_one
boost::asio::detail::dev_poll_reactor::run
忙等待循环一直持续到我们停止接收器为止。此外,如前所述,如果消息的大小超过 65536 (2^16) 字节并且发送方和接收方在 2 个不同的 Solaris 主机上运行,则该问题始终是可重现的。如果发送方和接收方都在同一个 Solaris 主机上运行,则问题总是可以重现,大小为 131067 (2^17)。
有人可以解释一下为什么这里的 ioservice 线程一直在尝试从接收器读取,尽管它已经成功发送了消息并且接收器实际上并没有在套接字上写任何东西。请让我知道是否有人遇到此问题或提出任何解决方案。
这是重现问题的代码(我刚刚更改了 boost 异步 TCP 日间服务器示例以返回指定字节数的消息而不是日间并将客户端连接置于读取模式)
#include <ctime>
#include <iostream>
#include <cstdlib>
#include <string>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/asio.hpp>
#include <boost/scoped_array.hpp>
using boost::asio::ip::tcp;
std::string make_message(unsigned int message_size)
{
using namespace std;
std::string data(message_size, 'A');
return data;
}
class tcp_connection : public boost::enable_shared_from_this<tcp_connection>
{
public:
typedef boost::shared_ptr<tcp_connection> pointer;
static pointer create(boost::asio::io_service& io_service, unsigned int message_size)
{
return pointer(new tcp_connection(io_service, message_size));
}
tcp::socket& socket()
{
return socket_;
}
void handleMessage(const boost::system::error_code& message_error)
{
if (message_error) {
std::cout<<"Error while reading the message from client"<<std::endl;
}
else {
std::cout<<"Read the message from client"<<std::endl;
}
}
void start()
{
// Perform async_read on client connection to read data from the client.
_header.reset(new char[6]);
_header[5] = '\0';
boost::asio::async_read(socket_, boost::asio::buffer(_header.get(), 5),
boost::bind(&tcp_connection::handleMessage, shared_from_this(),
boost::asio::placeholders::error));
message_ = make_message(message_size_);
boost::asio::async_write(socket_, boost::asio::buffer(message_),
boost::bind(&tcp_connection::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
private:
tcp_connection(boost::asio::io_service& io_service, int message_size)
: socket_(io_service), message_size_(message_size)
{
}
void handle_write(const boost::system::error_code& /*error*/,
size_t bytes_transferred)
{
std::cout<<"Bytes written: "<< bytes_transferred << std::endl;
}
tcp::socket socket_;
std::string message_;
boost::scoped_array<char> _header;
unsigned int message_size_;
};
class tcp_server
{
public:
tcp_server(boost::asio::io_service& io_service, unsigned int port,
unsigned int message_size)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
message_size_(message_size)
{
start_accept();
}
private:
void start_accept()
{
tcp_connection::pointer new_connection =
tcp_connection::create(acceptor_.get_io_service(), message_size_);
acceptor_.async_accept(new_connection->socket(),
boost::bind(&tcp_server::handle_accept, this, new_connection,
boost::asio::placeholders::error));
}
void handle_accept(tcp_connection::pointer new_connection,
const boost::system::error_code& error)
{
if (!error)
{
new_connection->start();
start_accept();
}
}
tcp::acceptor acceptor_;
unsigned int message_size_;
};
int main(int argc, char* argv[])
{
if (argc != 3) {
std::cerr << "Usage: server port message_size" << std::endl;
return 1;
}
unsigned int port = boost::lexical_cast<unsigned int>(argv[1]);
unsigned int message_size = boost::lexical_cast<unsigned int>(argv[2]);
try {
boost::asio::io_service io_service;
tcp_server server(io_service, port, message_size);
io_service.run();
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
对于客户端,我们可以使用 boost 同步 TCP 白天客户端示例(更改为接受端口作为参数)。
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
int main(int argc, char* argv[])
{
try {
if (argc != 3) {
std::cerr << "Usage: client <host> <port>" << std::endl;
return 1;
}
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(argv[1], argv[2]);
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::resolver::iterator end;
tcp::socket socket(io_service);
boost::system::error_code error = boost::asio::error::host_not_found;
while (error && endpoint_iterator != end) {
socket.close();
socket.connect(*endpoint_iterator++, error);
}
if (error)
throw boost::system::system_error(error);
for (;;) {
boost::array<char, 128> buf;
boost::system::error_code error;
size_t len = socket.read_some(boost::asio::buffer(buf), error);
if (error == boost::asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
std::cout.write(buf.data(), len);
}
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
以下是我在两个不同的 solaris 主机上运行服务器和客户端的方式 -
server 9081 200000
client <server_host> 9081
运行服务器和客户端后,在服务器上执行“truss”,这将显示如上所述的紧密轮询循环。
PS:我检查了boost::asio::async_write,写入大于 65536 字节的数据,但这不相关,因为我确保消息在 async_write 完成之前不会被破坏。