20

我正在学习 RxJava,并且作为我的第一个实验,尝试重写此代码中第run()一种方法中的代码(在Netflix 的博客中引用为 RxJava 可以帮助解决的问题)以使用 RxJava 提高其异步性,即它不会f1.get()在继续执行其余代码之前,请等待第一个 Future ( ) 的结果。

f3取决于f1。我知道如何处理这个问题,flatMap似乎可以解决问题:

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
        }
    });

接下来f4f5取决于f2. 我有这个:

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer i) {
            Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
            Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
            return Observable.merge(f4Observable, f5Observable);
        }
    });

这开始变得很奇怪(merge他们可能不是我想要的......)但最后允许我这样做,而不是我想要的:

f3Observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("Observed from f3: " + s);
        f4And5Observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println("Observed from f4 and f5: " + i);
            }
        });
    }
});

这给了我:

Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100

这是所有的数字,但不幸的是我在单独的调用中得到了结果,所以我不能完全替换原始代码中的最终 println:

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));

我不明白如何在同一行访问这两个返回值。我认为这里可能缺少一些函数式编程。我怎样才能做到这一点?谢谢。

4

3 回答 3

19

看起来你真正需要的只是对如何使用 RX 多一点鼓励和看法。我建议您更多地阅读文档以及大理石图(我知道它们并不总是有用的)。我还建议研究lift()函数和运算符。

  • observable 的全部意义在于将数据流和数据操作连接到一个对象中
  • 调用点mapflatMap操作filter数据流中的数据
  • 合并的重点是合并数据流
  • 操作符的目的是让您能够中断稳定的可观察数据流并定义您自己对数据流的操作。例如,我编写了一个移动平均运算符。这将 s 总结n double为双精度的 Observable 以返回移动平均线流。代码看起来像这样

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

许多您认为理所当然的过滤方法都隐藏在幕后,您会感到宽慰lift()

照这样说; 合并多个依赖项所需要的只是:

  • map使用或将所有传入数据更改为标准数据类型flatMap
  • 将标准数据类型合并到流中
  • 如果一个对象需要等待另一个对象,或者如果您需要在流中对数据进行排序,则使用自定义运算符。注意:这种方法会减慢流量
  • 用于列出或订阅以收集所有数据
于 2014-08-12T22:07:23.440 回答
7

编辑:有人将我作为问题编辑添加的以下文本转换为答案,我很欣赏并理解这可能是正确的做法,但是我不认为这是一个答案,因为它显然不是正确的方法。我永远不会使用此代码,也不会建议任何人复制它。欢迎其他/更好的解决方案和评论!


我能够用以下方法解决这个问题。我没有意识到你可以flatMap多次观察,我认为结果只能被消耗一次。所以我只是flatMapf2Observable 两次(抱歉,自从我的原始帖子以来,我在代码中重命名了一些东西),然后zip在所有 Observables 上,然后订阅它。Map由于类型杂耍,zip聚合值是不可取的。欢迎其他/更好的解决方案和评论!完整代码可在 gist 中查看。谢谢你。

Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
Observable<Integer> f2Observable = Observable.from(f2);
Observable<Integer> f4Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
            return Observable.from(f4);
        }       
    });     

Observable<Integer> f5Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
            return Observable.from(f5);
        }       
    });     

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
    @Override
    public Map<String, String> call(String s, Integer integer, Integer integer2) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("f3", s);
        map.put("f4", String.valueOf(integer));
        map.put("f5", String.valueOf(integer2));
        return map;
    }       
}).subscribe(new Action1<Map<String, String>>() {
    @Override
    public void call(Map<String, String> map) {
        System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
    }       
});     

这产生了我想要的输出:

responseB_responseA => 714000
于 2014-09-20T05:25:58.290 回答
2

我认为您正在寻找的是switchmap。我们遇到了类似的问题,我们有一个会话服务来处理从 api 获取新会话,我们需要该会话才能获取更多数据。我们可以添加到返回 sessionToken 的 session observable 中,以便在我们的数据调用中使用。

getSession 返回一个 observable;

public getSession(): Observable<any>{
  if (this.sessionToken)
    return Observable.of(this.sessionToken);
  else if(this.sessionObservable)
    return this.sessionObservable;
  else {
    // simulate http call 
    this.sessionObservable = Observable.of(this.sessonTokenResponse)
    .map(res => {
      this.sessionObservable = null;
      return res.headers["X-Session-Token"];
    })
    .delay(500)
    .share();
    return this.sessionObservable;
  }
}

并且 getData 获取该 observable 并附加到它。

public getData() {
  if (this.dataObservable)
    return this.dataObservable;
  else {
    this.dataObservable = this.sessionService.getSession()
      .switchMap((sessionToken:string, index:number) =>{
        //simulate data http call that needed sessionToken
          return Observable.of(this.dataResponse)
          .map(res => {
            this.dataObservable = null;
            return res.body;
          })
          .delay(1200)
        })
        .map ( data => {
          return data;
        })
        .catch(err => {
          console.log("err in data service", err);
         // return err;
        })
        .share();
    return this.dataObservable;
  }
}

您仍然需要一个平面图来组合不依赖的 observables。

Plunkr:http ://plnkr.co/edit/hiA1jP?p=info

我在哪里得到使用开关图的想法:http: //blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html

于 2016-10-04T16:23:43.910 回答