0

我将 RxKotlin 与 Retrofit 2 一起使用

我正在尝试弄清楚如何在单个操作中拥有动态的观察者列表。

第一个观察者应该触发操作,所有其他观察者应该等到操作完成/失败

操作完成后,我需要进行数据操作(存储在缓存/内存中),然后通知所有观察者。

这是我所做的:

class UserManager
{
    val observers = ArrayList<Observer<ArrayList<User>>>()
    var isFetchingUsers = false

    fun getUsers(observer: Observer<ArrayList<User>>)
    {
        observers.add(observer)

        if (isFetchingUsers)
        {
            return
        }

        api.getUserList.observeOn(AndroidSchedulers.mainThread()).subscribe(object : Observer<UserListResponse>
        {
            override fun onNext(response: UserListResponse)
            {
                // Do some manipulations on the response and notify all

                observers.forEach {
                    it.onNext(response.getUsers())
                }
            }

            override fun onError(e: Throwable)
            {
                observers.forEach {
                    it.onError(Throwable())
                }
            }

            override fun onComplete()
            {
                isFetchingUsers = false
                observers.clear()
            }

            override fun onSubscribe(d: Disposable)
            {
            }
        })
    }
}

这是 Retrofit observable 创建(这个是在 Java 中的..)

   /**
     * Get users
     */
    public Observable<UserListResponse> getUserList()
    {
        return mService.getUserList().subscribeOn(Schedulers.io());
    }

我确信有更好的方法来做到这一点

谢谢!

4

1 回答 1

0

您可以share()在可观察对象上使用运算符。只有第一个订阅会导致 observable 完成它的创建过程。一旦最后一个订阅者被取消订阅,observable 将执行其销毁过程。

Observable<Long> v;
...
Observable<Long> sharedObservable = v
  .doOnSubscribe( () -> logger.debug("subscribe") )
  .doOnUnsubscribe( () -> logger.debug("unsubscribe") )
  .share();

...
Subscription v1 = sharedObservable.subscribe();
Subscription v2 = sharedObservable.subscribe();
...
v1.unsubscribe();
v2.unsubscribe();

您将看到订阅操作只发生一次。您应该看到原始 observable 只发生了一次订阅操作,并且只有一次取消订阅。

于 2017-11-09T18:15:54.660 回答