(在 RxKotlin 和 RxJava 中工作,但为了简单起见使用元代码)
许多指南首先从已经可用的数据Reactive Extensions
创建一个。Observable
从The Introduction to Reactive Programming you've been missing,它是从单个字符串创建的
var soureStream= Rx.Observable.just('https://api.github.com/users');
同样,来自RxKotlin的首页,来自填充列表
val list = listOf(1,2,3,4,5)
list.toObservable()
现在考虑一个简单的过滤器,它产生一个outStream
,
var outStream = sourceStream.filter({x > 3})
在这两个指南中,源事件都被声明为apriori。这意味着事件的时间线有某种形式
source: ----1,2,3,4,5-------
out: --------------4,5---
我怎样才能修改sourceStream
成为更多的管道?换句话说,在sourceStream
创建过程中没有可用的输入数据?当源事件可用时,它会立即由 out 处理:
source: ---1--2--3-4---5-------
out: ------------4---5-------
我希望找到一个Observable.add()
动态更新
var sourceStream = Observable.empty()
var outStream = sourceStream.filter({x>3})
//print each element as its added
sourceStream .subscribe({println(it)})
outStream.subscribe({println(it)})
for i in range(5):
sourceStream.add(i)
这可能吗?