我正在尝试实现一个在我的应用程序Observable
之间共享的计时器Activities
。我正在一个作为 Dagger 单例的类上进行实现,我在Presenter
每个不同的Activity
.
我以这种方式创建了一次 Observable:
Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> this::doSomethingCool()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.share();
我使用以下功能从 Presenter 订阅:
public Observable<Status> register(Callback callback) {
PublishSubject<Status> subject = PublishSubject.create();
subject.subscribe(status -> {},
throwable -> L.LOGE(TAG, throwable.getMessage()),
() -> callback.onStatusChanged(mBasketStatus));
mObservable.subscribe(subject);
basketCounterCallback.onStatusChanged(status));
subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
}
我将其存储Subject
在Observable
每个演示者中,并调用:
obs.unsubscribeOn(AndroidSchedulers.mainThread())
取消订阅(在onPause()
方法中)。我还尝试使用调度程序取消订阅Schedulers.immediate()
但是无论如何都会调用回调 X 次(其中 X 是我订阅计时器的所有演示者),所以它不是取消订阅。日志"Unsubcribed from subject!"
也没有被调用。
如何正确退订每个主题?
提前致谢
编辑:
由于评论添加了更多实施细节:
这是我创建 Observable 并存储在Singleton
类成员中的部分StatusManager
(状态也是单例):
private Observable<BasketStatus> mObservable;
private Status mStatus;
public Observable<BasketStatus> start(long milliseconds, Status status, Callback callback) {
if (mObservable == null) mObservable = createObservable(milliseconds, status);
return register(callback);
}
private Observable<BasketStatus> createObservable(long milliseconds, Status status) {
mStatus = status;
return Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> status.upgradeStatus()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.share();
}
public Observable<BasketStatus> register(Callback callback) {
PublishSubject<Status> subject = PublishSubject.create();
subject.subscribe(status -> {},
throwable -> L.LOGE(TAG, throwable.getMessage()),
() -> callback.onStatusChanged(mStatus));
mObservable.subscribe(subject);
callback.onStatusChanged(mStatus));
subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
}
在start(...)
从Presenter
启动计时器调用register(...)
方法之后,我从下一个演示者调用方法:
class Presenter implements Callback {
private Observable<BasketStatus> mRegister;
@Inject
public Presenter(Status status, StatusManager statusManager) {
mRegister = statusManager.start(20000, status, this);
}
// Method called from onPause()
public void unregisterFromBasketStatus() {
mRegister.unsubscribeOn(Schedulers.immediate());
}
}
下一位主持人...
@Inject
public NextPresenter(StatusManager statusManager) {
mBasketStatusManager.register(this);
}