3

举一个冷酷的例子:

Observable<Integer> cold = Observable.create(subscriber -> {
  try {
    for (int i = 0; i <= 42; i++) {

      // avoid doing unnecessary work
      if (!subscriber.isUnsubscribed()) {
        break;
      }

      subscriber.onNext(i);
    }
    subscriber.onCompleted();
  } catch (Throwable cause) {
    subscriber.onError(cause);
  }
});

它开始为每个新订阅者从头开始执行:

// starts execution
cold.subscribe(...)

如果订阅者提前取消订阅,则可以停止执行:

// stops execution
subscription.unsubscribe();

现在,如果我们有一些实际的业务逻辑而不是示例 for 循环(不需要为每个订阅者重播,而是实时的),那么我们正在处理 hot observable ......

PublishSubject<Integer> hot = PublishSubject.create();

Thread thread = new Thread(() -> {
  try {
    for (int i = 0; i < 42; i++) {
      // how to avoid unnecessary work when no one is subscribed?
      hot.onNext(i);
    }
    hot.onCompleted();
  } catch (Throwable cause) {
    hot.onError(cause);
  }
});

当我们想要它开始时,我们可能会做

// stats work (although no one is subscribed) 
thread.start();

因此第一个问题:如何仅在第一个观察者订阅时开始工作?(可能是可连接的可观察的?)

还有一个重要的问题:当最后一个订阅者退订时如何停止工作?(我不知道如何访问该主题的当前订阅,如果存在这样的解决方案,我希望找到没有共享全局状态的干净解决方案)

我能想到的一种解决方案是使用自定义运营商来提升主题,该运营商将管理订户......

4

1 回答 1

4

请参阅运营商refCount - http://reactivex.io/documentation/operators/refcount.html。此 Operator 将您的Observable转换为ConnectableObservable,并在第一个订阅者订阅时连接它,并在没有更多订阅时断开连接

于 2015-12-25T13:33:58.537 回答