是否可以对 boost::asio 中的条件变量执行异步等待(读取:非阻塞)?如果不直接支持任何有关实施的提示,将不胜感激。
我甚至可以每隔几毫秒实现一个计时器并触发一次唤醒,但这种方法要差得多,我很难相信条件变量同步没有实现/记录。
是否可以对 boost::asio 中的条件变量执行异步等待(读取:非阻塞)?如果不直接支持任何有关实施的提示,将不胜感激。
我甚至可以每隔几毫秒实现一个计时器并触发一次唤醒,但这种方法要差得多,我很难相信条件变量同步没有实现/记录。
如果我正确理解了意图,你想在 asio 线程池的上下文中启动一个事件处理程序,当某个条件变量发出信号时?我认为在处理程序的开头等待条件变量就足够了,而io_service::post()本身最终又回到了池中,这种情况:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()
{
boost::unique_lock<boost::mutex> lk(mx);
cv.wait(lk);
std::cout << "handler awakened\n";
io.post(handler);
}
void buzzer()
{
for(;;)
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
boost::lock_guard<boost::mutex> lk(mx);
cv.notify_all();
}
}
int main()
{
io.post(handler);
boost::thread bt(buzzer);
io.run();
}
我可以建议基于 boost::asio::deadline_timer 的解决方案,这对我来说很好。这是 boost::asio 环境中的一种异步事件。一件非常重要的事情是,'handler' 必须通过与 'cancel' 相同的 'strand_' 序列化,因为在多个线程中使用 'boost::asio::deadline_timer' 不是线程安全的。
class async_event
{
public:
async_event(
boost::asio::io_service& io_service,
boost::asio::strand<boost::asio::io_context::executor_type>& strand)
: strand_(strand)
, deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin))
{}
// 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one'
// because using 'boost::asio::deadline_timer' from multiple threads is not thread safe
template<class WaitHandler>
void async_wait(WaitHandler&& handler) {
deadline_timer_.async_wait(handler);
}
void async_notify_one() {
boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this));
}
void async_notify_all() {
boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this));
}
private:
void async_notify_one_serialized() {
deadline_timer_.cancel_one();
}
void async_notify_all_serialized() {
deadline_timer_.cancel();
}
boost::asio::strand<boost::asio::io_context::executor_type>& strand_;
boost::asio::deadline_timer deadline_timer_;
};
不幸的是,Boost ASIO 没有async_wait_for_condvar()
方法。
在大多数情况下,您也不需要它。以 ASIO 方式编程通常意味着您使用链而不是互斥锁或条件变量来保护共享资源。除了在启动和退出时通常关注正确的构造或销毁顺序的极少数情况外,您根本不需要互斥锁或条件变量。
修改共享资源时,经典的部分同步线程方式如下:
完全异步的 ASIO 方式是:
这是一个类的示例some_shared_resource
,它接收一个字符串state
并根据接收到的状态触发一些进一步的处理。请注意,私有方法中的所有处理some_shared_resource::receive_state()
都是完全线程安全的,因为 strand 会序列化所有调用。
当然,这个例子并不完整;some_other_resource
需要与 类似的send_code_red()
方法some_shared_ressource::send_state()
。
#include <boost/asio>
#include <memory>
using asio_context = boost::asio::io_context;
using asio_executor_type = asio_context::executor_type;
using asio_strand = boost::asio::strand<asio_executor_type>;
class some_other_resource;
class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> {
asio_strand strand;
std::shared_ptr<some_other_resource> other;
std::string state;
void receive_state(std::string&& new_state) {
std::string oldstate = std::exchange(state, new_state);
if(state == "red" && oldstate != "red") {
// state transition to "red":
other.send_code_red(true);
} else if(state != "red" && oldstate == "red") {
// state transition from "red":
other.send_code_red(false);
}
}
public:
some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other)
: strand(ctx.get_executor()), other(other) {}
void send_state(std::string&& new_state) {
boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable {
if(auto self = me.lock(); self) {
self->receive_state(std::move(new_state));
}
});
}
};
如您所见,一开始总是向 ASIO 发布帖子可能有点乏味。但是您可以将大部分“为类配备链”代码移动到模板中。
消息传递的好处:由于您不使用互斥锁,因此即使在极端情况下,您也不能再陷入僵局。此外,使用消息传递,创建高级并行性通常比使用经典多线程更容易。不利的一面是,围绕所有这些消息对象移动和复制非常耗时,这可能会减慢您的应用程序的速度。
最后一点:在形成的消息中使用弱指针send_state()
有助于可靠地销毁some_shared_resource
对象:否则,如果 A 调用 B,B 调用 C,C 调用 A(可能仅在超时或类似情况后),则使用共享指针而不是弱指针消息中的指针会创建循环引用,从而防止对象破坏。如果您确定永远不会有循环,并且处理来自待删除对象的消息不会造成问题,那么您当然可以使用shared_from_this()
代替weak_from_this()
。如果您确定在 ASIO 停止之前不会删除对象(并且所有工作线程都已加入主线程),那么您也可以直接捕获this
指针。