0

我有一个可观察对象 obs1、obs2、obs3、...、

他们每个人都可以发出许多项目(来自 mongodb 数据库),我只对前 N 个项目感兴趣。我想确保仅在需要时才执行对我的 observables 的查询。换句话说,例如,如果 obs1 产生的结果多于 N,则 obs2 后面的查询不应该运行,等等。

如果我使用 concat: Observable(obs1, obs2, obs3, ...).concat,所有查询都可以在 mongodb 中并行运行

基本上,我正在寻找类似 obs1.switchIfX(obs2).switchIfX(obs3) 的操作.....

其中 X:当前 observable 发出少于 N 个项目。

知道如何以 rxscala 样式实现此要求吗?

4

2 回答 2

2

您可以尝试这样的事情(未经测试):

Observable.just(obs1, obs2, obs3).flatten(maxConcurrent=1).take(N)

maxConcurrent参数确保flatten一次只订阅一个可观察对象,并且一旦N发出项目,take将从上游可观察对象取消订阅,因此如果在那时,obs2或者obs3尚未订阅,它们将永远不会按需要运行。

于 2017-12-13T05:30:54.930 回答
0

您可以将源中的项目收集到列表中,在flatMap运算符中检查其大小,如果长度不够,则切换到另一个源:

@Test
public void test() {
    Observable.range(1, 5)
    .compose(switchIfFewer(Observable.range(1, 8), 10))
    .compose(switchIfFewer(Observable.range(1, 15), 10))
    .test()
    .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
}

static <T> ObservableTransformer<T, T> switchIfFewer(Observable<T> other, int n) {
    return o -> {
        return o.toList()
        .flatMapObservable(list -> {
            if (list.size() < n) {
                return other;
            }
            return Observable.fromIterable(list);
        });
    };
}

如果您不想获得超过 N 个项目,则可以指定o.take(n).toList()

于 2017-12-13T08:57:29.537 回答