想象一下您向其发出事件的订阅者管道,它一个接一个地访问一个订阅者。
有一个 PublishSubject 和 x 个订阅者/观察者。通常,事件会以特定顺序发送给观察者,但同时不管观察者何时返回。是否可以执行此流程:
- 向观察者A发出事件
- osbserverA 返回后,将事件发送给observerB
- 在observerB返回后,将事件发送给observerC
Monifu 甚至有一个背压实现:
def onNext(elem: T): Future[Ack]
我希望在此示例中打印出“结果是:已更改!! ”:
val subject = PublishSubject[Int]()
var result = "Not Changed"
subject.subscribe { i =>
Observable.timerOneTime(3.seconds, Continue).asFuture.map { x =>
result = "Changed !!"
x.get
}
}
subject.subscribe { i =>
Observable.timerOneTime(1.seconds, Continue).asFuture.map { x =>
println("And Result was : " + result)
x.get
}
}
subject.onNext(1)
在 RxScala/RxJava 或 Monifu 中是否可以不扩展 Subject 并覆盖 onNext 实现?无论如何,这些类都被声明为最终的,所以它会相当黑客。