2

在下面的代码中,我创建了一个简单的 observable,它产生一个值然后完成。然后我分享那个可观察的重播最后一个项目并订阅 3 次。第一次紧随其后,第二次在产生值之前,第三次在产生值并且可观察对象完成之后。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
  console.log('Creating observable');
  i++;
  setTimeout(() => {
     obs.onNext(i);
     obs.onCompleted();
  }, 2000);
}).shareReplay(1);

obs$.subscribe(
  data => console.log(`s1: data = ${data}`),
  () => {},
  () => console.log('finish s1')
);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s2: data = ${data}`),
    () => {},
    () => console.log('finish s2')

  );  
}, 1000);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s3: data = ${data}`),
    () => {},
    () => console.log('finish s3')

  );  
}, 6000);

您可以在 jsbin 上执行此操作

这导致以下大理石图

Actual
s1: -----1$
s2:   \--1$
s3:           \1$

但我希望

Expected
s1: -----1$
s2:   \--1$
s3:           \----2$

我可以理解为什么有人想要第一个行为,但我的理由是,与这个例子不同,我返回一个数字,我可能会返回一个易受取消订阅行为影响的对象,例如数据库连接。如果上面的大理石图表示一个数据库连接,在我调用的 dispose 方法中db.close(),在第三个订阅上我会遇到一个异常,因为我收到一个已发布的数据库处理程序作为值。(因为当第二次订阅完成时 refCount = 0 并且源被释放)。

这个例子还有另一个奇怪的事情是,即使它使用第一个值解析并在之后完成,它订阅源两次(正如你可以从重复的“创建可观察”中看到的那样)

我知道这个 github 问题谈到了这一点,但我缺少的是:

如何实现(在 RxJs4 和 5 中)一个共享的 observable,如果源 observable 尚未完成,则可以重播最后一项,并且如果它已完成(refCount = 0),则重新创建 observable。

在 RxJs5 中,我认为 share 方法解决了我的问题的重新连接部分,但不是共享部分。

在 RxJs4 我一无所知

如果可能的话,我想使用现有的运算符或主题来解决这个问题。我的直觉告诉我,我必须用这种逻辑创建一个不同的主题,但我还没有完全做到。

4

1 回答 1

1

关于shareReplay的一点:

shareReplay在返回的 observable 的剩余生命周期内保持相同的底层ReplaySubject实例。

完成后,您不能再将ReplaySubject任何值放入其中,但仍会重播。所以...

  1. 您第一次订阅 observable 并且超时开始。这i0增加到1
  2. 您第二次订阅 observable 并且超时已经过去了。
  3. 超时回调触发并发送出去onNext(i),然后onCompleted()
  4. onCompleted()信号完成ReplaySubject内部shareReplay,这意味着从现在开始,共享的 observable 将简单地重放它的值(即 1)并完成。

一般而言,关于共享可观察对象的一些信息:

另一个单独的问题是,由于您共享了 observable,它只会调用订阅者函数一次。这意味着它i只会增加一次。所以即使你没有onCompleted杀死你的底层证券ReplaySubject,你最终也不会将它增加到2.

这不是 RxJS 5

一个快速的判断方法是onNextvs next。您当前在您的示例中使用 RxJS 4,但是您已经用 RxJS 5 标记了它,并且您在 RxJS 5 中发现了一个问题。RxJS 5 是测试版,是一个完全重写 RxJS 4 的新版本。 API 更改主要是为了匹配当前处于阶段 1 的 es-observable 提案

更新示例

我已经更新了您的示例,为您提供了预期的结果

基本上,您想在前两个调用中使用 observable 的共享版本,在第三个调用中使用原始 observable。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
  console.log('Creating observable');
  i++;
  setTimeout(() => {
     obs.onNext(i);
     obs.onCompleted();
  }, 2000);
})


let shared$ = obs$.shareReplay(1);

shared$.subscribe(
  data => console.log(`s1: data = ${data}`),
  () => {},
  () => console.log('finish s1')
);

setTimeout( () => {
  shared$.subscribe(
    data => console.log(`s2: data = ${data}`),
    () => {},
    () => console.log('finish s2')

  );  
}, 1000);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s3: data = ${data}`),
    () => {},
    () => console.log('finish s3')

  );  
}, 6000);

无关

此外,protip:请务必为您调用的自定义可观察对象返回取消语义clearTimeout

于 2016-04-22T16:59:29.603 回答