0

Monix looks like great framework but documentation is very sparse.

What is alsoTo analogue of akka-streams in monix ?

Basically I want stream to be consumed by two consumers.

4

1 回答 1

3

Monix 遵循 Rx 模型,因为订阅是动态的。AnyObservable支持无限数量的订阅者:

val obs = Observable.interval(1.second)

val s1 = obs.dump("O1").subscribe()
val s2 = obs.dump("O2").subscribe()

然而,有一个问题——Observable默认情况下被称为“冷数据源”,这意味着每个订阅者都有自己的数据源。

因此,例如,如果您有一个Observable从 a 读取的 a File,那么每个订阅者都会获得自己的文件句柄。

为了Observable在多个订阅者之间“共享”这样的数据,您必须将其转换为热数据源以进行共享。您可以使用最常用的multicast运算符及其版本来执行此操作。publish这些给你一个ConnectableObservable,需要一个connect()电话来开始流式传输:

val shared = obs.publish

// Nothing happens here:
val s1 = shared.dump("O1").subscribe()
val s2 = shared.dump("O2").subscribe()

// Starts actual streaming
val cancelable = shared.connect()

// You can subscribe after connect(), but you might lose events:
val s3 = shared.dump("O3").subscribe()

// You can unsubscribe one of your subscribers, but the
// data source keeps the stream active for the others
s1.cancel()

// To cancel the connection for all subscribers:
cancelable.cancel()

PS:monix.io正在进行中,欢迎 PRs

于 2018-02-14T08:04:14.357 回答