10

我正在使用带有 Observables 的 Retrofit,并且想链接 observables。通常它与map()or之类的函数配合得很好flatMap(),因为它会api返回一个完成任务的 Observable。但在这种情况下,我必须执行以下操作:

  1. getKey() 从api
  2. 在另一个库中使用返回的密钥Foo并等待回调被调用。
  3. 当回调返回时,将结果发送到api.

我希望这是一个单链调用,这样我只需要订阅一次。我猜我可以使用merge()orjoin()或其他东西,但不确定处理回调的最佳方法是什么。

有没有办法让它变得更好?这是我到目前为止所拥有的:

api.getKey().subscribe(new Action1<String>() {
   @Override
   public void call(String key) {
      Foo foo = new Foo();
      foo.setAwesomeCallback(new AwesomeCallback() {
         @Override
         public void onAwesomeReady(String awesome) {
            api.sendAwesome(awesome)
                    .subscribe(new Action1<Void>() {
                       @Override
                       public void call(Void aVoid) {
                           handleAwesomeSent();
                       }
                    });
         }
      });
      foo.makeAwesome();
   }
});
4

3 回答 3

17

适应 clemp6r 的解决方案,这是另一个既不需要Subjects也不需要嵌套的解决方案Subscriptions

api.getKey().flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {

        return Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(final Subscriber<? super String> subscriber) {
                Foo foo = new Foo();
                foo.setAwesomeCallback(new AwesomeCallback() {
                    @Override
                    public void onAwesomeReady(String awesome) {
                        if (! subscriber.isUnsubscribed()) {
                            subscriber.onNext(awesome);
                            subscriber.onComplete();
                        }
                    }
                });
                foo.makeAwesome();
            } 
        });
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String awesome) {
        return sendAwesome(awesome);
   }
}).subscribe(new Action1<Void>() {
    @Override
    public void call(Void aVoid) {
        handleAwesomeSent();
    }
});

一般来说,我认为总是可以将任何基于回调的异步操作包装在Observableusing 中Observable.create()

于 2015-04-16T18:11:27.250 回答
4

您必须使用 PublishSubject 将基于回调的 API 转换为 observable。

尝试类似的东西(未测试):

api.getKey().flatMap(new Func1<String, Observable<String>>() {
   @Override
   public Observable<String> call(String key) {
      Foo foo = new Foo();
      PublishSubject<String> subject = PublishSubject.create();
      foo.setAwesomeCallback(new AwesomeCallback() {
         @Override
         public void onAwesomeReady(String awesome) {
            subject.onNext(awesome);
            subject.onComplete();
         }
      });
      foo.makeAwesome();

      return subject;
   }
}).flatMap(new Func1<String, Observable<String>>() {
   @Override
   public Observable<String> call(String awesome) {
       return sendAwesome(awesome);
   }
}).subscribe(new Action1<Void>() {
    @Override
    public void call(Void aVoid) {
        handleAwesomeSent();
    }
});
于 2015-04-16T15:48:32.750 回答
1
Api api = new Api() {
  @Override Single<String> getKey() {
    return Single.just("apiKey");
  }
};

api.getKey()
    .flatMap(key -> Single.<String>create( singleSubscriber -> {
        Foo foo = new Foo();
        foo.setAwesomeCallback(awesome -> {
          try { singleSubscriber.onSuccess(awesome);}
          catch (Exception e) { singleSubscriber.onError(e); }
        });
        foo.makeAwesome();
    }))
    .flatMapCompletable(
        awesome -> Completable.create(completableSubscriber -> {
          try {
            sendAwesome(awesome);
            completableSubscriber.onCompleted();
          } catch (Exception e) { completableSubscriber.onError(e); }
        }))
    .subscribe(this::handleAwesomeSent, throwable -> { });

有关完整的匿名类示例,请参见要点

此实现通过使用SingleCompletable类型以及 来适应 david.mihola 答案flatMapCompletable(),同时是类型安全/特定的。

于 2016-08-26T00:18:10.513 回答