根据以下文件groupBy
:
注意:A
GroupedObservable
将缓存它要发出的项目,直到它被订阅。出于这个原因,为了避免内存泄漏,你不应该简单地忽略那些GroupedObservable
与你无关的s。相反,您可以向他们发出信号,表明他们可能会通过对他们应用类似的运算符take(int)(0)
来丢弃他们的缓冲区。
有一个RxJava 教程说:
在内部,每个 Rx 操作员做 3 件事
- 它订阅源并观察值。
- 它根据操作员的目的转换观察到的序列。
- 它通过调用 onNext、onError 和 onCompleted 将修改后的序列推送给自己的订阅者。
让我们看看下面的代码块,它只从 中提取偶数range(0, 10)
:
Observable.range(0, 10)
.groupBy(i -> i % 2)
.filter(g -> g.getKey() % 2 == 0)
.flatMap(g -> g)
.subscribe(System.out::println, Throwable::printStackTrace);
我的问题是:
这是否意味着
filter
运营商已经暗示订阅每个组产生的组groupBy
或只是Observable<GroupedObservable>
一个组?这种情况下会不会有内存泄漏?如果是这样的话,
如何正确丢弃这些组?替换
filter
为自定义的,其中 atake(0)
后跟 areturn Observable.empty()
?你可能会问我为什么不直接返回take(0)
:这是因为filter
不一定紧跟在 之后groupBy
,而是可以在链中的任何位置并涉及更复杂的条件。