Monix looks like great framework but documentation is very sparse.
What is alsoTo
analogue of akka-streams in monix ?
Basically I want stream to be consumed by two consumers.
Monix 遵循 Rx 模型,因为订阅是动态的。AnyObservable
支持无限数量的订阅者:
val obs = Observable.interval(1.second)
val s1 = obs.dump("O1").subscribe()
val s2 = obs.dump("O2").subscribe()
然而,有一个问题——Observable
默认情况下被称为“冷数据源”,这意味着每个订阅者都有自己的数据源。
因此,例如,如果您有一个Observable
从 a 读取的 a File
,那么每个订阅者都会获得自己的文件句柄。
为了Observable
在多个订阅者之间“共享”这样的数据,您必须将其转换为热数据源以进行共享。您可以使用最常用的multicast
运算符及其版本来执行此操作。publish
这些给你一个ConnectableObservable
,需要一个connect()
电话来开始流式传输:
val shared = obs.publish
// Nothing happens here:
val s1 = shared.dump("O1").subscribe()
val s2 = shared.dump("O2").subscribe()
// Starts actual streaming
val cancelable = shared.connect()
// You can subscribe after connect(), but you might lose events:
val s3 = shared.dump("O3").subscribe()
// You can unsubscribe one of your subscribers, but the
// data source keeps the stream active for the others
s1.cancel()
// To cancel the connection for all subscribers:
cancelable.cancel()
PS:monix.io正在进行中,欢迎 PRs