0

我想使用Subject在不使用mapflatMap的情况下在一个 Observable 到下一个 Observable 之间创建本质上的管道,因为使用它们非常冗长。

我不知道如何用 Subject 做到这一点,尽管这似乎是正确的方法,因为 Subject (根据文档):

既充当订阅者又充当 Observable

签名Subject<T,R>在它是一个Observable<R>和一个的地方Observer<T>。这意味着我应该能够通过管道T传输到R.

这是它在代码中的基本外观:

class MySubject extends Subject<T, R> {

  protected MySubject(OnSubscribe<R> onSubscribe) {
    super(onSubscribe);
  }

  @Override public void onNext(T in) {
    // Data comes in
  }

  @Override public void onCompleted() {
    // do something
  }

  @Override public void onError(Throwable throwable) {
    // do something with the error
  }

}
4

1 回答 1

0

我在Monifu中通过以 flatMap 方式提供观察者来处理这个问题(忽略PublishSubject是最终类的事实)。在 RxJava 或 RxScala 中应该是类似的:

class PipeSubject extends PublishSubject[RxEvent] {
  override def onNext(elem: RxEvent): Future[Ack] = {
    if (!isCompleted) {
      val observers = subscriptions
      if (observers.nonEmpty)
        pipeThroughMany(observers, elem)
      else
        Continue
    }
    else
      Cancel
  }

 private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = {
    val length = array.length
    def >>>(idx: Int = 0): Future[Continue] = {
      val obs = array(idx)
      obs.onNext(elem).flatMap {
         case Continue =>
           if (idx+1 < length)
              >>>(idx+1)
           else
             Continue
         case _ =>
           removeSubscription(obs)
           Continue
      }
    }
    >>>()
  }
}
于 2014-12-04T16:33:13.913 回答