1

我正在尝试创建一个 RxJava BlockingObservable,它将每 X 毫秒发出一个变量的值,直到(条件 == true)或发生超时。

下面的代码似乎接近我想要的,但它总是发出一次然后退出。奇怪的是,我有一个takeUntil()永远不会成立的条件——我希望这个 observable 持续发射并最终超时,但事实并非如此。

我在这里错过了什么/做错了什么?

Observable.fromCallable(() -> getSendWindow())
        .sample(10, TimeUnit.MILLISECONDS)
        .timeout(30, TimeUnit.SECONDS)
        .takeUntil(sendWindow -> 1==2)
        .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
        .doOnCompleted(() -> {
            log.info("Send window cleared");
        })
        .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());
4

1 回答 1

2

.sample 不会像您认为的那样做。采样率将上述 Observable 限制为(最多)每 10 秒一次。

Observable.fromCallable()只发出一次事件,然后完成。

.sample()每 10 秒等待 10 秒并发出最后一个事件(如果有)。因此,当您将它附加到只有一个事件的 Observable 时,它​​只会发出一个事件。然后就完成了。

您可能真正想要的(我是.net 程序员,所以请原谅我的套管等)就是这个。

编辑:感谢@akanokd 告诉我java 使用间隔来重复事件。

Observable.interval(10, timeUnit.MILLISECONDS)
    .map(x -> getSendWindow())
    .takeUntil(sendWindow -> 1==2)
    .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
     .doOnCompleted(() -> {
            log.info("Send window cleared");
        })
    .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());

随意使用对 JAVA 特定版本的 API 调用来编辑此答案...

于 2016-12-22T02:56:01.580 回答