考虑一个阻塞函数: this_thread::sleep_for(milliseconds(3000));
我正在尝试获得以下行为:
Trigger Blocking Function
|---------------------------------------------X
我想触发阻塞功能,如果它需要太长时间(超过两秒),它应该超时。
我做了以下事情:
my_connection = observable<>::create<int>([](subscriber<int> s) {
auto s2 = observable<>::just(1, observe_on_new_thread()) |
subscribe<int>([&](auto x) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
});
}) |
timeout(seconds(2), observe_on_new_thread());
我不能让它工作。对于初学者,我认为 s 不能从不同的线程 on_next 。
所以我的问题是,这样做的正确反应方式是什么?如何在 rxcpp 中包装阻塞函数并为其添加超时?
随后,我想获得一个行为如下的 RX 流:
Trigger Cleanup
|------------------------X
(Delay) Trigger Cleanup
|-----------------X