0

我希望我的 api 上的一个方法返回 Observable<Observable<Object>> 但我希望该方法中的代码在所有包含的 Observables 完成后知道,以便它可以关闭某些东西。做这个的最好方式是什么?

更明确地说,我正在完成这个方法:

public static <T> Observable<Observable<T>> doWhenAllComplete(
        final Observable<Observable<T>> original, Action0 action) {
  ...
}
4

3 回答 3

2

很抱歉我的答案在 .NET 中(就像 system.reactive 标签一样);我相信你可以翻译它!

如果你当时IObservable<IObservable<Object>>给出source

source.Merge()
      .Subscribe(_  => {}, /* not interested in onNext */
                 () => /* onCompleted action here, called when all complete */);

注意:如果任何流错误(导致合并流在该点终止),这将崩溃,因此您也可以这样做以吞下各个流上的错误:

source.SelectMany(x => x.Catch(Observable.Empty<Object>()))
      .Subscribe(_  => {}, /* not interested in onNext */
                 () => /* onCompleted action here, called when all complete */);
于 2014-01-31T10:08:22.370 回答
0

我相信,该方法的这种实现似乎没有副作用:

public static <T> Observable<Observable<T>> doWhenAllComplete(
        final Observable<Observable<T>> original, final Action0 action) {
    return Observable.create(new OnSubscribeFunc<Observable<T>>() {

        @Override
        public Subscription onSubscribe(Observer<? super Observable<T>> o) {
            ConnectableObservable<Observable<T>> published = original
                    .publish();
            Subscription sub1 = Observable.merge(published)
                    .doOnCompleted(action).subscribe();
            Subscription sub2 = published.subscribe(o);
            Subscription sub3 = published.connect();
            return Subscriptions.from(sub1, sub2, sub3);
        }
    });
}
于 2014-02-09T04:03:19.630 回答
0

对我来说,这有效:

bothSources = source1.Cast<Object>().Merge (source2.Cast<Object>());

就我而言,我只需要等待 2 个源,但您可以创建一个函数来接收源列表并合并所有源。

于 2014-11-14T22:04:58.700 回答