8

我很难找到如何使用 RxJava 2 制作自定义运算符的示例。我考虑了一些方法:

  1. 使用Observable.create, 然后flatMap从源 observable 对其进行 ing。我可以让它工作,但感觉不太对劲。我最终创建了一个提供源的静态函数Observable,然后在源上创建了 flatMap。在 OnSubscribe 中,然后我实例化一个我将发射器传递给的对象,该对象处理和管理 Observable / Emitter(因为它不是微不足道的,我希望一切都尽可能封装)。
  2. 创建一个ObservableOperator并将其提供给Observable.lift. 对于 RxJava 2,我找不到任何这样的示例。我不得不调试自己的示例,以确保我对上游和下游的理解是正确的。因为我找不到任何关于 RxJava 2 的示例或文档,所以我有点担心我可能会不小心做一些我不应该做的事情。
  3. 创建我自己的Observable类型。这似乎是底层运算符的工作方式,其中许多扩展了AbstractObservableWithUpstream. 不过这里发生了很多事情,似乎很容易错过一些事情或做一些我不应该做的事情。我不确定我是否应该采取这样的方法。我逐步完成了心理过程,似乎它很快就会变得毛茸茸。

我将继续使用选项#2,但认为值得询问 RxJava2 中支持的执行此操作的方法是什么,并找出是否有任何文档或示例。

4

2 回答 2

6

不建议初学者编写运算符,并且可以通过现有运算符实现许多所需的流模式。

你看过 RxJava 的关于为 2.x 编写操作符的 wiki吗?我建议从上到下阅读。

  1. usingcreate()是可能的,但大多数人使用它来通过 for-each 循环发出 a 的元素List,而没有意识到这样Flowable.fromIterable做。
  2. 尽管 RxJava 2 运算符不使用lift()自己,但我们保留了这个扩展点。如果您想避免使用选项 3. 的一些样板,那么您可以尝试这条路线
  3. 这就是 RxJava 2 操作符的实现方式。AbstractObservableWithUpstream是一个小小的便利,对于外部实现者来说不是必需的。
于 2017-02-01T20:20:22.747 回答
2

这可能会对您有所帮助。我实现了运算符 RxJava2 来处理 APiError。我使用电梯操作员。

请参阅示例。

  public final class ApiClient implements ApiClientInterface {
    ...
      @NonNull
      @Override
      public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) {
          return myApiService.activate(email, emailData)
                  .lift(getApiErrorTransformer())
                  .subscribeOn(Schedulers.io());
      }

      private <T>ApiErrorOperator<T> getApiErrorTransformer() {
          return new ApiErrorOperator<>(gson, networkService);
      }

  }

然后你可以找到自定义运算符

    public final class ApiErrorOperator<T> implements ObservableOperator<T, T> {
        private static final String TAG = "ApiErrorOperator";
        private final Gson gson;
        private final NetworkService networkService;

        public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) {
            this.gson = gson;
            this.networkService = networkService;
        }

        @Override
        public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
            return new Observer<T>() {
                @Override
                public void onSubscribe(Disposable d) {
                    observer.onSubscribe(d);
                }

                @Override
                public void onNext(T value) {
                    observer.onNext(value);
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError", e);

                if (e instanceof HttpException) {
                        try {
                            HttpException error = (HttpException) e;
                            Response response = error.response();
                            String errorBody = response.errorBody().string();

                            ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class);
                            ApiException exception = new ApiException(errorResponse, response);

                            observer.onError(exception);
                        } catch (IOException exception) {
                            observer.onError(exception);
                        }

                    } else if (!networkService.isNetworkAvailable()) {
                        observer.onError(new NetworkException(ErrorResponse.builder()
                                .setErrorCode("")
                                .setDescription("No Network Connection Error")
                                .build()));
                    } else {
                        observer.onError(e);
                    }
                }

                @Override
                public void onComplete() {
                    observer.onComplete();
                }
            };
        }
    }
于 2017-06-08T09:10:06.743 回答