我不熟悉使用 rxcpp 并尝试在以下情况下将某些功能组合在一起:
我有一个数据源,它将从一个单独的源检索命令,我正在编写的代码会将这些命令检索到一个 rxcpp observable 中。它有一个特殊的条件,如果在一定时间内没有收到命令,订阅者会运行 onError 函数而不是 onNext,但超时只能发生在收到第一个命令之前。在接收到第一个命令后,无论它需要多长时间才能接收到任何进一步的命令,都不会发生超时。
我正在尝试通过以下方式完成此操作:
auto timeout = rxcpp::observable<>::timer(std::chrono::steady_clock::now() + timeout,
rxcpp::observe_on_event_loop()).map([](int val) // Note, converts the value type of the timer observable and converts timeouts to errors
{
std::cout << "TIMED OUT!" << std::endl;
throw std::runtime_error("timeout");
return command_type();
});
auto commands = timeout.amb(rxcpp::observe_on_event_loop(), createCommandSource(event_loop_scheduler, ...));
我遇到的问题是超时发生在收到任何命令之前,即使它们在超时发生之前很远。我已经尝试过从 1000 毫秒到 5000 毫秒的超时,这没有任何区别。但是,如果我删除超时代码,则会立即收到命令。我怀疑我很可能只是误解了如何在 rxcpp 中使用调度程序,所以我想知道如何实现这一点。