0

在我的 todoApp 中,我以以下方式实现了 MediatorLiveData 以用于学习目的:

    private val todoListMediator = MediatorLiveData<NetworkResult<List<TodoEntity>>>()
 

    private var todoSource: LiveData<NetworkResult<List<TodoEntity>>> =
        MutableLiveData()
    val todoResponse: LiveData<NetworkResult<List<TodoEntity>>> get() = todoListMediator

    viewModelScope.launch(dispatcherProvider.main) {
        
        todoSource =
            todoRepository.getTodoList()
        todoListMediator.addSource(todoSource) { 
            todoListMediator.value = it
        }
    }

上面的代码工作正常。现在我想进行以下更改,但我不清楚如何实现它们。

  • 一旦 todoListMediator.addSource() 观察到 todoList: 1] 我想迭代那个原始的 Todo 列表并对每个元素进行网络调用并向它们添加更多字段。2] 我想保存新的待办事项列表(每个待办事项现在都有一些我们在步骤 1 中通过网络调用收到的额外字段) 3] 最后,我想将新的待办事项列表(带有额外字段)分配给 todoListMediator。

      // sudo to illustrate the above scenario 
     viewModelScope.launch(dispatcherProvider.main) {
                 //step.1 get original todo list
                 todoListMediator.addSource(todoSource) { it ->
    
                 // step 2. iterate over original todo list from step 1 and make network call to get extra field for each element and update the original list
                 //for example
                 val newFieldForTodoElement =  NetworkResult<PendingTodoValues?> = todoRepository.getPendingTodoValues()
    
                 // step 3. one all the original list is updated with new field values, save the Todo list in local db
    
                 // step 4. Then pass the todo list with new fields to mediator live data from db
                 todoListMediator.value = it
                 }
             }
    

任何对代码有详细解释的技巧都会对我的学习有很大帮助。谢谢!

4

1 回答 1

0

您可以将 RXjava 与 Flatmap 运算符一起使用

类似的事情可能会有所帮助?

  getPostsObservable()
            .subscribeOn(Schedulers.io())
            .flatMap(new Function<Post, ObservableSource<Post>>() {
                @Override
                public ObservableSource<Post> apply(Post post) throws Exception {
                    return getCommentsObservable(post);
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Post>() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposables.add(d);
                }

                @Override
                public void onNext(Post post) {
                    updatePost(post);
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError: ", e);
                }

                @Override
                public void onComplete() {
                }
            });
}

private Observable<Post> getPostsObservable(){
    return ServiceGenerator.getRequestApi()
            .getPosts()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .flatMap(new Function<List<Post>, ObservableSource<Post>>() {
                @Override
                public ObservableSource<Post> apply(final List<Post> posts) throws Exception {
                    adapter.setPosts(posts);
                    return Observable.fromIterable(posts)
                            .subscribeOn(Schedulers.io());
                }
            });
}

private void updatePost(final Post p){
    Observable
            .fromIterable(adapter.getPosts())
            .filter(new Predicate<Post>() {
                @Override
                public boolean test(Post post) throws Exception {
                    return post.getId() == p.getId();
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Post>() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposables.add(d);
                }

                @Override
                public void onNext(Post post) {
                    Log.d(TAG, "onNext: updating post: " + post.getId() + ", thread: " + Thread.currentThread().getName());
                    adapter.updatePost(post);
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError: ", e);
                }

                @Override
                public void onComplete() {
                }
            });
}

private Observable<Post> getCommentsObservable(final Post post){
    return ServiceGenerator.getRequestApi()
            .getComments(post.getId())
            .map(new Function<List<Comment>, Post>() {
                @Override
                public Post apply(List<Comment> comments) throws Exception {

                    int delay = ((new Random()).nextInt(5) + 1) * 1000; // sleep thread for x ms
                    Thread.sleep(delay);
                    Log.d(TAG, "apply: sleeping thread " + Thread.currentThread().getName() + " for " + String.valueOf(delay)+ "ms");

                    post.setComments(comments);
                    return post;
                }
            })
            .subscribeOn(Schedulers.io());
于 2022-02-16T01:03:47.973 回答