0

我试图返回一个Observable在回调中异步创建的:

const mkAsync = (observer, delay) =>
  setTimeout(() => Observable.of('some result').subscribe(observer), delay)

const create = arg => {
  const ret = new Subject()
  mkAsync(ret, arg)
  return ret
}

因此,我使用 aSubject作为订阅Observable回调底层的单播代理。

这个解决方案的问题是,当我取消订阅Subject' 订阅时,取消订阅不会转发到底层Observable. 看起来我需要某种类型的引用来在Subject没有更多订阅者时取消订阅,但是在这种命令式回调样式中使用它时我无法弄清楚。

我必须保持mkAsync空白,并正在寻找替代实现。

这是正确的方法吗?是否有使用 的替代解决方案Subject

取消订阅主题时,如何确保Observable取消创建的(unsubscribe在 上调用)?Subscription

4

1 回答 1

0

这是一个相当广泛的问题,很难说你想用这个实现什么。我有两个想法:

第一件事是,根据您传递的参数refCount(),存在仅存ConnectableObservable在于从multicast(或)返回的类上的运算符。publish有关更多详细信息,请参见实现(基本上如果您没有设置任何selector功能):https ://github.com/ReactiveX/rxjs/blob/5.5.11/src/operators/multicast.ts

我能想到的第二个问题是你基本上是这样做的:

const ret = new Subject()
Observable.of(...).subscribe(ret);

这样做的问题是它.of会立即发出它的next项目,然后它会发送complete通知。主体具有内部状态,当Subject收到complete通知时,它会将自己标记为stopped并且永远不会发出任何东西

我怀疑这就是发生在你身上的事情。即使您返回 Subject 实例return ret并随后可能订阅它,您仍然不会收到任何东西,因为该 Subject 已经收到了complete通知。

于 2018-05-25T08:34:53.303 回答