3

我正在尝试实现一个在我的应用程序Observable之间共享的计时器Activities。我正在一个作为 Dagger 单例的类上进行实现,我在Presenter每个不同的Activity.

我以这种方式创建了一次 Observable:

Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> this::doSomethingCool()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .share();

我使用以下功能从 Presenter 订阅:

public Observable<Status> register(Callback callback) {

    PublishSubject<Status> subject = PublishSubject.create();
    subject.subscribe(status -> {},
            throwable -> L.LOGE(TAG, throwable.getMessage()),
            () -> callback.onStatusChanged(mBasketStatus));

    mObservable.subscribe(subject);
    basketCounterCallback.onStatusChanged(status));

    subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
    return subject.asObservable();
}

我将其存储SubjectObservable每个演示者中,并调用: obs.unsubscribeOn(AndroidSchedulers.mainThread()) 取消订阅(在onPause()方法中)。我还尝试使用调度程序取消订阅Schedulers.immediate()

但是无论如何都会调用回调 X 次(其中 X 是我订阅计时器的所有演示者),所以它不是取消订阅。日志"Unsubcribed from subject!"也没有被调用。

如何正确退订每个主题?

提前致谢

编辑:

由于评论添加了更多实施细节:

这是我创建 Observable 并存储在Singleton类成员中的部分StatusManager(状态也是单例):

private Observable<BasketStatus> mObservable;
private Status mStatus;

public Observable<BasketStatus> start(long milliseconds, Status status, Callback callback) {

    if (mObservable == null) mObservable = createObservable(milliseconds, status);

    return register(callback);
}

private Observable<BasketStatus> createObservable(long milliseconds, Status status) {

    mStatus = status;

    return Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> status.upgradeStatus()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .share();
}

public Observable<BasketStatus> register(Callback callback) {

    PublishSubject<Status> subject = PublishSubject.create();
    subject.subscribe(status -> {},
            throwable -> L.LOGE(TAG, throwable.getMessage()),
            () -> callback.onStatusChanged(mStatus));

    mObservable.subscribe(subject);
    callback.onStatusChanged(mStatus));

    subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
    return subject.asObservable();
}

start(...)Presenter启动计时器调用register(...)方法之后,我从下一个演示者调用方法:

class Presenter implements Callback {

    private Observable<BasketStatus> mRegister;

    @Inject
    public Presenter(Status status, StatusManager statusManager) {
        mRegister = statusManager.start(20000, status, this);
    }

    // Method called from onPause()
    public void unregisterFromBasketStatus() {
         mRegister.unsubscribeOn(Schedulers.immediate());
    }
}

下一位主持人...

@Inject
public NextPresenter(StatusManager statusManager) {
    mBasketStatusManager.register(this);
}
4

1 回答 1

4

正如我在评论中提到的,你没有得到unsubscribeOn操作员的行为。它不会取消订阅任何内容,而是告诉每个订阅者,取消订阅发生时它应该在哪里工作。这也没有任何意义,因为您不是从 Observable 取消订阅,而是从表示观察者与流本身之间的连接的Subscription取消订阅。

回到你的问题。您的代码现在设计的方式,您返回的主题完全没有用。您正在订阅注册方法中可观察的计时器,但您将这些通知转发给此后任何人都不会订阅的主题。如果您真的需要它们,例如在我们看不到的代码的某些部分,您需要存储subscribe()作为对象的方法的结果,并在需要时Subscription调用它(我假设)。unsubscribe()unregisterFromBasketStatus

但是代码中还有更多问题。例如 :

subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable(); 

在第一行中,您不会将该操作的结果存储在任何地方。由于 Observables 是不可变的结构,每个操作符都会创建一个新的流来接收来自前一个的通知。由此,很明显,当您subject.asObservable()在第二行返回时,您不是从第一行返回已修改的 observable,而是返回未修改的旧的。要更正此问题,只需将subject变量替换为结果:subject = subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));

其次,使用流的原因之一是完全替换回调。当然,它会起作用,但你正在减轻 Rx 为代码库带来的许多好处。然而,它的可读性要差得多,任何在你之后必须处理你的代码的人都会诅咒你到地狱,以至于他可能真的成功:-)) 我理解,从一开始就很难学习 Rx,但是尝试尽可能避免这种情况。

于 2017-02-07T10:08:26.140 回答