我正在尝试创建一个 RxJava BlockingObservable
,它将每 X 毫秒发出一个变量的值,直到(条件 == true)或发生超时。
下面的代码似乎接近我想要的,但它总是发出一次然后退出。奇怪的是,我有一个takeUntil()
永远不会成立的条件——我希望这个 observable 持续发射并最终超时,但事实并非如此。
我在这里错过了什么/做错了什么?
Observable.fromCallable(() -> getSendWindow())
.sample(10, TimeUnit.MILLISECONDS)
.timeout(30, TimeUnit.SECONDS)
.takeUntil(sendWindow -> 1==2)
.doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
.doOnCompleted(() -> {
log.info("Send window cleared");
})
.toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());