我正在尝试使用 boost::beast 阅读 RocketChat websocket API。当我使用同步示例阅读它时它可以工作,但是当以某种方式使用 async_* 函数时,它似乎连接已关闭。
有问题的违规代码:
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/connect.hpp>
#include <iostream>
#include <deque>
class connection
{
public:
connection(boost::asio::io_context &io_context, boost::asio::ssl::context &ssl_context) :
_io_context{ io_context },
_resolver{ io_context },
_websocket{ io_context, ssl_context },
_dynamic_buffer{ boost::asio::dynamic_buffer(_read_buffer) }
{}
void connect(std::string_view host)
{
auto addresses = _resolver.resolve(host, "https");
boost::asio::connect(_websocket.lowest_layer(), addresses);
_websocket.next_layer().handshake(boost::asio::ssl::stream_base::client);
_websocket.handshake(boost::beast::string_view{ host.data(), host.size() }, "/websocket");
read_message();
}
void send(std::string &&message)
{
_write_buffer.push_back(std::move(message));
if (_write_buffer.size() == 1) {
send_message();
}
_websocket.write(boost::asio::buffer(message));
}
private:
using tcp_socket = boost::asio::ip::tcp::socket;
using ssl_socket = boost::asio::ssl::stream<tcp_socket>;
void read_message()
{
_websocket.async_read(_dynamic_buffer, [this](const boost::system::error_code &error, size_t transferred) {
if (error == boost::asio::error::operation_aborted) {
std::cout << "read aborted" << std::endl;
return;
} else if (error) {
std::cout << "read error: " << error.message() << std::endl;
return;
}
std::cout << " >> " << _read_buffer << std::endl;
_read_buffer.clear();
read_message();
});
}
void send_message()
{
if (_write_buffer.empty()) {
return;
}
_websocket.async_write(boost::asio::buffer(_write_buffer.front()), [this](const boost::system::error_code &error, size_t transferred) {
if (error == boost::asio::error::operation_aborted) {
std::cout << "write aborted" << std::endl;
return;
} else if (error) {
std::cout << "write error: " << error.message() << std::endl;
return;
}
if (transferred != _write_buffer.front().size()) {
std::cout << "Wrote " << transferred << " bytes instead of the expected " << _write_buffer.front().size() << std::endl;
return;
}
std::cout << " << " << _write_buffer.front() << std::endl;
_write_buffer.pop_front();
send_message();
});
}
boost::asio::io_context &_io_context;
boost::asio::ip::tcp::resolver _resolver;
boost::beast::websocket::stream<ssl_socket> _websocket;
std::string _read_buffer;
decltype(boost::asio::dynamic_buffer(_read_buffer)) _dynamic_buffer;
std::deque<std::string> _write_buffer;
};
int main()
{
boost::asio::io_context io_context;
boost::asio::ssl::context ssl_context { boost::asio::ssl::context::sslv23_client };
connection connection { io_context, ssl_context };
connection.connect("open.rocket.chat");
send("{\"msg\":\"connect\",\"version\":\"1\",\"support\":[\"1\"]}");
io_context.run();
}
运行此代码会产生以下输出:
>> {"server_id":"0"}
<< {"msg":"connect","version":"1","support":["1"]}
read error: The WebSocket stream was gracefully closed at both endpoints
如果我在连接后不发送初始消息,则连接不会关闭。如果我不使用异步调用,而是使用同步版本,它也不会关闭。
我做错了可能是非常愚蠢的事情。如果我正确解释了文档,那么在 websocket 上启动读取和写入应该是有效的,只要它们是从同一个链完成的,并且由于我只有一个线程,我应该有一个隐式链。
我究竟做错了什么?