0

我正在使用 Fernando Ceja 的简洁架构开发一个 Android 应用程序。我的交互者或用例之一负责获取用户的提要数据。为了获取数据,首先我必须从数据库表中检索用户的团队,然后我必须从服务器端获取提要列表。

这就是我从数据库层获取团队的方式:

    mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
        @Override
        public void onNext(List<SimpleTeam> simpleTeams) {
            super.onNext(simpleTeams);
            mTeams = simpleTeams;
        }
    });

TeamCache 基本上只是另一个负责获取我在数据库中的所有团队的交互器。

以下是我从服务器端获取 Feed 数据的方法:

    mFeedRepository.getFeed(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
        @Override
        protected void onServerSideError(Throwable errorResponse) {
            callback.onFeedFetchFailed(...);
        }

        @Override
        protected void onSuccess(List<ApiFeedResponse> responseBody) {
            //Do stuff with mTeams
            callback.onFeedFetched(...);
        }
    });

我的 GetFeedInteractor 类有一个名为 execute 的方法,我通过回调传递我稍后在 UI 中使用它来处理响应。所有这一切的问题是,目前我正在链接这样的响应:

@Override
public void execute(final Callback callback, String userSipId) {
    mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
        @Override
        public void onNext(List<SimpleTeam> simpleTeams) {
            super.onNext(simpleTeams);
            mTeams = simpleTeams;
            getFeedFromRepository(callback);
        }
    });
}

public void getFeedFromRepository(final Callback callback) {
    mFeedRepository.getFeedRx(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
        @Override
        protected void onServerSideError(Throwable errorResponse) {
            callback.onFeedFetchFailed("failed");
        }

        @Override
        protected void onSuccess(List<ApiFeedResponse> responseBody) {
            //Do stuff with mTeams
            List<BaseFeedItem> responseList = new ArrayList();

            for (ApiFeedResponse apiFeedResponse : responseBody) {
                responseList.add(FeedDataMapper.transform(apiFeedResponse));
            }

            callback.onFeedFetched(responseList);
        }
    });
}

正如您所看到的,一旦我从缓存交互器中获取了团队集合,我就会调用从同一个订阅者获取提要的方法。我不喜欢这个。我希望能够做一些更好的事情,比如使用 Observable.concat(getTeamsFromCache(), getFeedFromRepository()); 在订阅者中将调用链接到另一个 rx.Observable 并不是一件好事。我想我的问题是,如何链接两个使用不同订阅者的 rx.Observables?

更新:

ServerSubscriber 是我实现订阅改造服务的订阅者。它只是检查错误代码和一些东西。这是:

https://gist.github.com/4gus71n/65dc94de4ca01fb221a079b68c0570b5

默认订阅者是一个空的默认订阅者。这是:

https://gist.github.com/4gus71n/df501928fc5d24c2c6ed7740a6520330

TeamCache#getAllTeams() 返回 rx.Observable> FeedRepository#getFeed(int page, int offset) 返回 rx.Observable>

更新 2:

这是获取用户提要的交互器现在的样子:

@Override
    public void execute(final Callback callback, int offset, int pageSize) {
        User user = mGetLoggedUser.get();
        String userSipid = mUserSipid.get();

        mFeedRepository.getFeed(offset, pageSize) //Get items from the server-side
                .onErrorResumeNext(mFeedCache.getFeed(userSipid)) //If something goes wrong take it from cache
                .mergeWith(mPendingPostCache.getAllPendingPostsAsFeedItems(user)) //Merge the response with the pending posts
                .subscribe(new DefaultSubscriber<List<BaseFeedItem>>() {
                    @Override
                    public void onNext(List<BaseFeedItem> baseFeedItems) {
                        callback.onFeedFetched(baseFeedItems);
                    }

                    @Override
                    public void onError(Throwable e) {
                        if (e instanceof ServerSideException) {
                            //Handle the http error
                        } else if (e instanceof DBException) {
                            //Handle the database cache error
                        } else {
                            //Handle generic error
                        }
                    }
                });
    }
4

1 回答 1

3

我认为您错过了 RxJava 和反应式方法的要点,您不应该拥有具有 OO 层次结构和回调的不同订阅者。
您应该构造分离Observables的应该发出它处理的特定数据,没有Subscriber, 然后您可以根据需要链接您,最后,您拥有对链接流Observable预期的最终结果做出反应的订阅者。Observable

像这样的东西(使用 lambdas 有更细的代码):

TeamCache mTeamCache = new TeamCache();
FeedRepository mFeedRepository = new FeedRepository();

Observable.zip(teamsObservable, feedObservable, Pair::new)
    .subscribe(resultPair -> {
            //Do stuff with mTeams
            List<BaseFeedItem> responseList = new ArrayList(); 
            for (ApiFeedResponse apiFeedResponse : resultPair.second) {
                responseList.add(FeedDataMapper.transform(apiFeedResponse));
            }
        }, throwable -> {
             //handle errors
           }
     );

我已经使用了zip,而不是concat因为看起来你在这里有 2 个独立的调用,你想等待两者都完成(将它们“压缩”在一起)然后采取行动,但是当然,因为你已经分离了Observables流,你可以将它们链接在一起根据您的需要不同。

至于你ServerSubscriber的所有响应验证逻辑,它也应该是 rxify,所以你可以沿着你的服务器Observable流编写它。

像这样的东西(发出一些逻辑来简化,因为我不熟悉它......)

Observable<List<SimpleTeam>> teamsObservable = mTeamCache.getAllTeams();
Observable<List<ApiFeedResponse>> feedObservable = mFeedRepository.getFeed(0, 50)
            .flatMap(apiFeedsResponse -> {
                if (apiFeedsResponse.code() != 200) {
                    if (apiFeedsResponse.code() == 304) {
                        List<ApiFeedResponse> body = apiFeedsResponse.body();
                        return Observable.just(body);
                        //onNotModified(o.body());
                    } else {
                        return Observable.error(new ServerSideErrorException(apiFeedsResponse));
                    }
                } else {
                    //onServerSideResponse(o.body());
                    return Observable.just(apiFeedsResponse.body());
                }
            });
于 2017-04-20T19:20:06.563 回答