1

(在 RxKotlin 和 RxJava 中工作,但为了简单起见使用元代码)

许多指南首先从已经可用的数据Reactive Extensions创建一个。ObservableThe Introduction to Reactive Programming you've been missing,它是从单个字符串创建的

var soureStream= Rx.Observable.just('https://api.github.com/users');

同样,来自RxKotlin的首页,来自填充列表

val list = listOf(1,2,3,4,5)
list.toObservable()     

现在考虑一个简单的过滤器,它产生一个outStream,

var outStream = sourceStream.filter({x > 3})

在这两个指南中,源事件都被声明为apriori。这意味着事件的时间线有某种形式

source: ----1,2,3,4,5-------
out:    --------------4,5---

我怎样才能修改sourceStream成为更多的管道?换句话说,在sourceStream创建过程中没有可用的输入数据?当源事件可用时,它会立即由​​ out 处理:

source: ---1--2--3-4---5-------
out:    ------------4---5-------

我希望找到一个Observable.add()动态更新

var sourceStream = Observable.empty()
var outStream = sourceStream.filter({x>3})

//print each element as its added 
sourceStream .subscribe({println(it)})
outStream.subscribe({println(it)})

for i in range(5):
    sourceStream.add(i)

这可能吗?

4

1 回答 1

2

我是新手,但是没有主题我怎么能解决我的问题呢?如果我正在测试一个应用程序,并且我希望它每 5 秒“弹出”一次更新,那么除了发布订阅业务之外,我还能怎么做呢?有人可以发布不涉及订阅者的这个问题的答案吗?

如果你想每五秒弹出一次更新,然后用interval操作符创建一个 Observable,不要使用 Subject。构建 Observable 有几十种不同的运算符,因此您很少需要主题。

也就是说,有时你确实需要一个,它们在测试代码时非常方便。我在单元测试中广泛使用它们。

使用主语还是不使用主语?是关于主题的优秀文章。

于 2017-05-14T01:48:58.347 回答