我正在使用安全的 websocket boost::beast 实现,我希望能够在处理当前消息时接收新消息。所以我选择使用协程(带产量的方法)尝试它
- 这是我的 websocket 对象:
using Websocket = boost::beast::websocket::stream<
boost::beast::ssl_stream<boost::beast::tcp_stream>>;
std::optional<Websocket> ws_;
- 这就是我如何调用我的 websocket 侦听器代码
ws_.reset();
boost::asio::spawn(ioc_,
[=](const boost::asio::yield_context &yield) {
this->startAndServeWs(yield);
});
}
这就是我的 websocket 处理程序方法的工作原理:注意它分为两部分。
首先,初始化部分。
其次,准备在其中读取新消息的 websocket 主循环。
所以我的问题是下面的代码是否适合在处理当前消息时从服务器获取新消息(其中一个sendPostFetchItems
或者sendPostDownloadNewVersion
可能需要一段时间,因为它们触发了 http post 请求并等待服务器响应)。如果没有,那么我可以假设新消息将排队,等待当前消息句柄完成时的下一次迭代吗?
我的第二个问题是关于 catch 语句,如果这是重试连接的正确方法
void Comm::startAndServeWs(const boost::asio::yield_context &yield) {
try {
// webSocet init according to ssl context and io_context.
ws_.emplace(ioc_, ctx_);
boost::beast::get_lowest_layer(ws_.value())
.expires_after(std::chrono::seconds(30));
boost::asio::ip::tcp::resolver resolver(io_context_);
auto const results =
resolver.async_resolve(ip_.value(), port_.value(), yield);
auto ep = boost::beast::get_lowest_layer(ws_.value())
.async_connect(results, yield);
// Set SNI Hostname (many hosts need this to handshake successfully)
if (!SSL_set_tlsext_host_name(ws_.value().next_layer().native_handle(),
ip_.value().c_str())) {
throw("Failed to set SNI Hostname");
}
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// Se e https://tools.ietf.org/html/rfc7230#section-5.4
auto address = ip_.value() + std::to_string(ep.port());
// Perform the SSL handshake
ws_.value().next_layer().handshake(boost::asio::ssl::stream_base::client);
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
boost::beast::get_lowest_layer(ws_.value()).expires_never();
// Set suggested timeout settings for the websocket
ws_.value().set_option(
boost::beast::websocket::stream_base::timeout::suggested(
boost::beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.value().set_option(boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::request_type &req) {
req.set(boost::beast::http::field::user_agent, kWebsocketIdentifier);
}));
Log("trying to establish websocket in address {}", address);
ws_.value().async_handshake(address, "/ws", yield);
//////////////////// here's the websocket main loop.
for (;;) {
boost::beast::flat_buffer buffer;
// Read a message into our buffer
---> ws_.value().async_read(buffer, yield);
Log("websocket response buffer = {}",boost::beast::make_printable(buffer.data()));
try {
nlohmann::json response = nlohmann::json::parse(s);
if (response["command"] == "fetchItems") {
sendPostFetchItems();
} else if (response["command"] == "getLogs") {
sendPostDownloadNewVersion();
}...
}
} catch (std::exception &e) {
Log("websocket reconnect failed. reason = {}", e.what());
ws_.reset();
timer_websocket_.expires_at(timer_websocket_.expiry() +
boost::asio::chrono::seconds(10));
timer_websocket_.async_wait(
[this]([[maybe_unused]] const boost::system::error_code &error) {
boost::asio::spawn(io_context_,
[this](const boost::asio::yield_context &yield) {
this->startAndServeWs(yield);
});
});
}
}