举一个冷酷的例子:
Observable<Integer> cold = Observable.create(subscriber -> {
try {
for (int i = 0; i <= 42; i++) {
// avoid doing unnecessary work
if (!subscriber.isUnsubscribed()) {
break;
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Throwable cause) {
subscriber.onError(cause);
}
});
它开始为每个新订阅者从头开始执行:
// starts execution
cold.subscribe(...)
如果订阅者提前取消订阅,则可以停止执行:
// stops execution
subscription.unsubscribe();
现在,如果我们有一些实际的业务逻辑而不是示例 for 循环(不需要为每个订阅者重播,而是实时的),那么我们正在处理 hot observable ......
PublishSubject<Integer> hot = PublishSubject.create();
Thread thread = new Thread(() -> {
try {
for (int i = 0; i < 42; i++) {
// how to avoid unnecessary work when no one is subscribed?
hot.onNext(i);
}
hot.onCompleted();
} catch (Throwable cause) {
hot.onError(cause);
}
});
当我们想要它开始时,我们可能会做
// stats work (although no one is subscribed)
thread.start();
因此第一个问题:如何仅在第一个观察者订阅时开始工作?(可能是可连接的可观察的?)
还有一个重要的问题:当最后一个订阅者退订时如何停止工作?(我不知道如何访问该主题的当前订阅,如果存在这样的解决方案,我希望找到没有共享全局状态的干净解决方案)
我能想到的一种解决方案是使用自定义运营商来提升主题,该运营商将管理订户......