3

考虑一个阻塞函数: this_thread::sleep_for(milliseconds(3000));

我正在尝试获得以下行为:

Trigger Blocking Function               

|---------------------------------------------X

我想触发阻塞功能,如果它需要太长时间(超过两秒),它应该超时。

我做了以下事情:

my_connection = observable<>::create<int>([](subscriber<int> s) {
    auto s2 = observable<>::just(1, observe_on_new_thread()) |
    subscribe<int>([&](auto x) {
        this_thread::sleep_for(milliseconds(3000));
        s.on_next(1);
    });
}) |
timeout(seconds(2), observe_on_new_thread());

我不能让它工作。对于初学者,我认为 s 不能从不同的线程 on_next 。

所以我的问题是,这样做的正确反应方式是什么?如何在 rxcpp 中包装阻塞函数并为其添加超时?

随后,我想获得一个行为如下的 RX 流:

Trigger                Cleanup

|------------------------X
                           (Delay)   Trigger           Cleanup
                                       |-----------------X
4

1 回答 1

2

好问题!上面的很接近。

这是一个如何使阻塞操作适应 rxcpp 的示例。它执行libcurl 轮询以发出 http 请求。

以下应该做你想要的。

auto sharedThreads = observe_on_event_loop();

auto my_connection = observable<>::create<int>([](subscriber<int> s) {
        this_thread::sleep_for(milliseconds(3000));
        s.on_next(1);
        s.on_completed();
    }) |
    subscribe_on(observe_on_new_thread()) |
    //start_with(0) | // workaround bug in timeout
    timeout(seconds(2), sharedThreads);
    //skip(1); // workaround bug in timeout

my_connection.as_blocking().subscribe(
    [](int){}, 
    [](exception_ptr ep){cout << "timed out" << endl;}
);
  • subscribe_oncreate在专用线程上运行,因此create可以阻止该线程。
  • timeout将在另一个线程上运行计时器,该线程可以与其他线程共享,并将所有//on_next调用转移到同一线程。on_erroron_completed
  • as_blocking将确保在subscribe完成之前不会返回。这仅用于防止main()退出 - 最常见于测试或示例程序。

编辑:为timeout. 目前,在第一个值到达之前,它不会安排第一次超时。

EDIT-2:timeout错误已修复,不再需要解决方法。

于 2017-07-12T18:40:42.193 回答