2

我有一个随意记录的数据流,但是在我的程序的某个状态下,我需要来自流的数据,而不是到目前为止观察到的最新数据(我可以这样做),而是之后的最新数据:

val dataStream : Observable[Data] = ...
dataStream
 .doOnNext(logger.trace(_))
 .subscribe()

// ...
// at some state of program later:
// ...
def awaitOneElement(Observable[Data]): Data = ???
val expecting: Data = awaitOneElement(dataStream.filter(...))

我该如何实施def awaitOneElement(Observable[Data]): Data = ???

我知道这可能是惯用的不正确,但肮脏的同步等待正是我所需要的。我也很好Observable[Data] => Future[Data],将Await在下一步结束。

4

2 回答 2

0

如果你只需要一个元素,你可以使用类似Consumer.heador的东西Consumer.headOption

val first: Task[Option[Int]] =
  Observable.empty[Int].consumeWith(Consumer.headOption)]

现在您可以将任务转换为未来。如果您需要所有元素,您可以将 foldLeft 与累加器一起使用:

Consumer.foldLeft[T,Vector[T]](Vector.empty[T])((vec, t) => vec :+ t )

或者在需要时实现自定义消费者和触发回调。取自文档: https ://monix.io/docs/3x/reactive/consumer.html

UPD:对于最后一个元素,您可以使用这样的消费者:

Consumer.foldLeft[T,Option[T]](Option.empty[T])((opt, t) => Some(t))
于 2020-02-21T11:51:56.723 回答
0

我的解决方案:

  implicit val scheduler                     = mx
  val pendingPromise: AtomicReference[Option[Promise[A]]] = new AtomicReference(None)
  val lastSubject: ConcurrentSubject[A, A]   = ConcurrentSubject.publish[A]

  o.doOnNext(a => if (pendingPromise.get().nonEmpty) lastSubject.onNext(a))
    .subscribe()

  lastSubject
    .filter(filter)
    .doOnNext(a => pendingPromise.getAndSet(None).map(_.success(a)))
    .subscribe()

  def getNext(duration: Duration): Try[A] = {
    val promise = Promise[A]()
    pendingPromise.set(Some(promise))
    Try(Await.result(promise.future, duration))
  }
}

调用 getNext 时,Promise会创建 a 并返回 future。当 Observable 中发生期望的事件时,promise 会被填充并移除。

于 2020-02-22T07:34:35.207 回答