2

想象一下您向其发出事件的订阅者管道,它一个接一个地访问一个订阅者。

有一个 PublishSubject 和 x 个订阅者/观察者。通常,事件会以特定顺序发送给观察者,但同时不管观察者何时返回。是否可以执行此流程:

  1. 向观察者A发出事件
  2. osbserverA 返回后,将事件发送给observerB
  3. 在observerB返回后,将事件发送给observerC

我正在使用RxScalaMonifu Rx 实现

Monifu 甚至有一个背压实现:

def onNext(elem: T): Future[Ack]

我希望在此示例中打印出“结果是:已更改!! ”:

  val subject = PublishSubject[Int]()

  var result = "Not Changed"
  subject.subscribe { i =>
    Observable.timerOneTime(3.seconds, Continue).asFuture.map { x =>
      result = "Changed !!"
      x.get
    }
  }

  subject.subscribe { i =>
    Observable.timerOneTime(1.seconds, Continue).asFuture.map { x =>
      println("And Result was : " + result)
      x.get
    }
  }

  subject.onNext(1)

在 RxScala/RxJava 或 Monifu 中是否可以不扩展 Subject 并覆盖 onNext 实现?无论如何,这些类都被声明为最终的,所以它会相当黑客。

4

1 回答 1

1

我认为答案是一个自定义的 Subject 实现,就像 Monifu 中的这样,它会以 flatMap 方式为观察者提供信息(忽略PublishSubject是最终类的事实):

class PipeSubject extends PublishSubject[RxEvent] {
  override def onNext(elem: RxEvent): Future[Ack] = {
    if (!isCompleted) {
      val observers = subscriptions
      if (observers.nonEmpty)
        pipeThroughMany(observers, elem)
      else
        Continue
    }
    else
      Cancel
  }

 private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = {
    val length = array.length
    def >>>(idx: Int = 0): Future[Continue] = {
      val obs = array(idx)
      obs.onNext(elem).flatMap {
         case Continue =>
           if (idx+1 < length)
              >>>(idx+1)
           else
             Continue
         case _ =>
           removeSubscription(obs)
           Continue
      }
    }
    >>>()
  }
}
于 2014-12-04T14:21:23.593 回答