在下面的代码中,我创建了一个简单的 observable,它产生一个值然后完成。然后我分享那个可观察的重播最后一个项目并订阅 3 次。第一次紧随其后,第二次在产生值之前,第三次在产生值并且可观察对象完成之后。
let i = 0;
let obs$ = Rx.Observable.create(obs => {
console.log('Creating observable');
i++;
setTimeout(() => {
obs.onNext(i);
obs.onCompleted();
}, 2000);
}).shareReplay(1);
obs$.subscribe(
data => console.log(`s1: data = ${data}`),
() => {},
() => console.log('finish s1')
);
setTimeout( () => {
obs$.subscribe(
data => console.log(`s2: data = ${data}`),
() => {},
() => console.log('finish s2')
);
}, 1000);
setTimeout( () => {
obs$.subscribe(
data => console.log(`s3: data = ${data}`),
() => {},
() => console.log('finish s3')
);
}, 6000);
这导致以下大理石图
Actual
s1: -----1$
s2: \--1$
s3: \1$
但我希望
Expected
s1: -----1$
s2: \--1$
s3: \----2$
我可以理解为什么有人想要第一个行为,但我的理由是,与这个例子不同,我返回一个数字,我可能会返回一个易受取消订阅行为影响的对象,例如数据库连接。如果上面的大理石图表示一个数据库连接,在我调用的 dispose 方法中db.close()
,在第三个订阅上我会遇到一个异常,因为我收到一个已发布的数据库处理程序作为值。(因为当第二次订阅完成时 refCount = 0 并且源被释放)。
这个例子还有另一个奇怪的事情是,即使它使用第一个值解析并在之后完成,它订阅源两次(正如你可以从重复的“创建可观察”中看到的那样)
我知道这个 github 问题谈到了这一点,但我缺少的是:
如何实现(在 RxJs4 和 5 中)一个共享的 observable,如果源 observable 尚未完成,则可以重播最后一项,并且如果它已完成(refCount = 0),则重新创建 observable。
在 RxJs5 中,我认为 share 方法解决了我的问题的重新连接部分,但不是共享部分。
在 RxJs4 我一无所知
如果可能的话,我想使用现有的运算符或主题来解决这个问题。我的直觉告诉我,我必须用这种逻辑创建一个不同的主题,但我还没有完全做到。