我有一个随意记录的数据流,但是在我的程序的某个状态下,我需要来自流的数据,而不是到目前为止观察到的最新数据(我可以这样做),而是之后的最新数据:
val dataStream : Observable[Data] = ...
dataStream
.doOnNext(logger.trace(_))
.subscribe()
// ...
// at some state of program later:
// ...
def awaitOneElement(Observable[Data]): Data = ???
val expecting: Data = awaitOneElement(dataStream.filter(...))
我该如何实施def awaitOneElement(Observable[Data]): Data = ???
?
我知道这可能是惯用的不正确,但肮脏的同步等待正是我所需要的。我也很好Observable[Data] => Future[Data]
,将Await
在下一步结束。