我对提升 asio 还是很陌生,我在多线程服务器中遇到随机文件结束。
我可以在这个小例子中重现我的问题:
服务器:
这是一个简单的回声服务器。该协议很简单:
- (1) 一个客户端连接
- (2) 服务器读取一个字节。该字节是要读取和发回的字符串的长度。
- (3) 服务器读取 N 个字节。
- (4) 服务器返回 N+1 个字节给客户端,然后返回 (2)。
当客户端断开连接时(3)中捕获到一个 EOF 并且处理程序循环停止。
class MySocket{
public:
char buffer[257];
boost::asio::ip::tcp::socket socket;
MySocket(boost::asio::io_service*ios):socket(*ios){}
~MySocket(){}
};
//Handlers
void readN(std::shared_ptr<MySocket>server,const boost::system::error_code&ec);
//(4)
void echo(std::shared_ptr<MySocket>server,const boost::system::error_code&ec){
if(ec){
throw std::exception(("This is NOT OK: "+ec.message()).c_str());}
size_t n=server->buffer[0]&0xFF;
std::cout<<std::string(server->buffer+1,n)<<std::endl;
boost::asio::async_write(server->socket,boost::asio::buffer(server->buffer,n+1),boost::bind(readN,server,boost::asio::placeholders::error));}
//(3)
void read(std::shared_ptr<MySocket>server,const boost::system::error_code&ec){
if(ec){
throw std::exception(("This is OK: "+ec.message()).c_str());}
size_t n=server->buffer[0]&0xFF;
boost::asio::async_read(server->socket,boost::asio::buffer(server->buffer+1,n),boost::bind(echo,server,boost::asio::placeholders::error));}
//(2)
void readN(std::shared_ptr<MySocket>server,const boost::system::error_code&ec){
if(ec){
throw std::exception(("This is also NOT OK: "+ec.message()).c_str());}
boost::asio::async_read(server->socket,boost::asio::buffer(server->buffer+0,1),boost::bind(read,server,boost::asio::placeholders::error));}
//Server
void serve(boost::asio::io_service*ios){
for(;;){
try{ios->run();break;}
catch(const std::exception&e){std::cout<<e.what()<<std::endl;}}}
//(1)
void accept(boost::asio::io_service*ios,boost::asio::ip::tcp::acceptor*acceptor,std::shared_ptr<MySocket>server,const boost::system::error_code&ec){
if(server.get()!=nullptr){
server->socket.set_option(boost::asio::ip::tcp::no_delay(true));
readN(server,ec);}
server.reset(new MySocket(ios));
acceptor->async_accept(server->socket,boost::bind(accept,ios,acceptor,server,boost::asio::placeholders::error));}
int main(){
boost::asio::io_service ios;
boost::asio::ip::tcp::acceptor acceptor(ios,boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),1207));
boost::asio::io_service::work work(ios);
accept(&ios,&acceptor,nullptr,boost::system::error_code());
// std::thread other(boost::bind(serve,&ios));
serve(&ios);
acceptor.close();
ios.stop();
// other.join();
return 0;}
客户:
客户端连接到服务器一次并发送 1000 个字符串。
int main(){
try{
boost::asio::io_service ios;
boost::asio::ip::tcp::socket socket(ios);
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string("127.0.0.1"),1207);
socket.connect(endpoint);
socket.set_option(boost::asio::ip::tcp::no_delay(true));
char buf[257];
for(size_t i=0;i<1000;++i){
size_t n=(i%127)+1;
buf[0]=(char)n;
for(size_t j=0;j<n;++j){
buf[j+1]=(char)('A'+(j+i)%26);}
socket.send(boost::asio::buffer(buf,n+1));
socket.receive(boost::asio::buffer(buf,1));
if((buf[0]&0xFF)!=n){
throw std::exception("Oups!");}
socket.receive(boost::asio::buffer(buf+1,n));
for(size_t j=0;j<n;++j){
if(buf[j+1]!=(char)('A'+(j+i)%26)){
throw std::exception("Oups!");}}
std::cout<<i<<": "<<std::string(buf+1,n)<<std::endl;}}
catch(const std::exception&e){
std::cout<<e.what()<<std::endl;}
return 0;}
当服务器仅使用一个线程(其他线程已注释)时,服务器会正确回显 1000 个字符串。
当服务器使用另一个线程时,在(4)中在随机打印字符串之后捕获一个EOF。这不应该发生。
- 我尝试用一条链包装所有异步调用,但它不起作用。
- 据我所知,不存在数据竞争问题。处理程序应该一个接一个地调用。
我错过了什么 ?
处理多线程 asio 应用程序的正确习惯用法是什么?
编辑 :
我做了一些测试,看来如果我替换这条线
throw std::exception(("This is NOT OK: "+ec.message()).c_str());
和:
std::cout<<"This is not OK: "<<ec.message()<<std::endl;
即使我看到一些 EOF 被错误地作为参数传递了几次,服务器也会正确地回显 1000 行。
所以我想问题是为什么当套接字显然没有关闭时我得到一个不正确的 boost::asio::error::eof ?
这不是这里所说的。