5

根据以下文件groupBy

注意:AGroupedObservable将缓存它要发出的项目,直到它被订阅。出于这个原因,为了避免内存泄漏,你不应该简单地忽略那些GroupedObservable与你无关的s。相反,您可以向他们发出信号,表明他们可能会通过对他们应用类似的运算符take(int)(0)来丢弃他们的缓冲区。

有一个RxJava 教程说:

在内部,每个 Rx 操作员做 3 件事

  1. 它订阅源并观察值。
  2. 它根据操作员的目的转换观察到的序列。
  3. 它通过调用 onNext、onError 和 onCompleted 将修改后的序列推送给自己的订阅者。

让我们看看下面的代码块,它只从 中提取偶数range(0, 10)

Observable.range(0, 10)
        .groupBy(i -> i % 2)
        .filter(g -> g.getKey() % 2 == 0)
        .flatMap(g -> g)
        .subscribe(System.out::println, Throwable::printStackTrace);

我的问题是:

  1. 这是否意味着filter运营商已经暗示订阅每个组产生的组groupBy或只是Observable<GroupedObservable>一个组?

  2. 这种情况下会不会有内存泄漏?如果是这样的话,

  3. 如何正确丢弃这些组?替换filter为自定义的,其中 atake(0)后跟 a return Observable.empty()?你可能会问我为什么不直接返回take(0):这是因为filter不一定紧跟在 之后groupBy,而是可以在链中的任何位置并涉及更复杂的条件。

4

2 回答 2

5

除了内存泄漏之外,由于内部请求协调问题,当前的实现可能最终完全挂起。

请注意,使用take(0),可以一直重新创建组。我会改为使用ignoreElementswhich drop values,没有项目到达flatMap,并且组本身不会一直重新创建。

于 2015-11-25T22:37:33.560 回答
4

您的怀疑是正确的,因为要正确处理分组的可观察对象,每个内部可观察对象 ( g) 都必须订阅。就像filter订阅外部可观察对象一样,这是一个坏主意。只需在使用中执行您需要的flatMap操作ignoreElements即可过滤掉不需要的组。

Observable.range(0, 10)
    .groupBy(i -> i % 2)
    .flatMap(g -> {
       if (g.getKey() % 2 == 0) 
         return g;
       else 
         return g.ignoreElements();
    })
    .subscribe(System.out::println, Throwable::printStackTrace);
于 2015-11-25T22:20:36.087 回答