- 我想要一个 pubsub 系统,它有流的生产者和消费者,通过数据平面层但没有主题。
- 许多生产者可以多播到相同的流名称(例如,'filters.add'),并且多个消费者可以订阅该流。
- 生产者注册名称和流以创建“已发布流的流”。
如果一个新的生产者注册了一个特定的流名称,流向消费者的流会动态更新,以便接收到的数据是所有当前活动的已发布流与请求的名称的合并。
一个生产者流完成不应导致聚合流完成,而是在发生这种情况时被排除(更新所有消费者)。
在生产者注册后获得流的消费者应该收到该流的最后一个发出的值(如果迄今为止有一个)。
这是我到目前为止的代码和我希望满足的测试。我可以满足基本情况但不能缓存,因为:
- 已完成的流被排除在外,即使它们可能已经发出了最新的值;
.cache(1)
afterstream.publish().refCount()
重复4
从第一个输入Observable
,而不是8
从第二个输入;- 即使
pluckActiveStreams
注释掉了,这也不起作用,我只是无法让它与任何注释掉的方法一起工作......
我的问题是:
- 如何实现聚合流的缓存?注释掉的最后几行都没有
relevantStreams$
做到这一点。 - 我认为我需要从收到的聚合流中删除已完成的流,以便聚合流本身不会(永远)完成。这是(可能)使用 完成的
pluckActiveStreams
,它基于此处给出的解决方案,但仍为迟到的消费者提供流的最后发出的值,即使它可能来自现已完成的流?
我当前的实现已经在内部使用 ReplaySubjects 的 observables 作为生产者和消费者之间的“桥梁”成功地实现了这一点。但是这样:
- 所有生产者流都会自动变热;
- ReplaySubject 需要一个 hack 来确保它永远不会在任何多播到它的流中完成(否则它会关闭所有消费者的流,如此处所述);
- 一旦在第一个生产者注册时创建,代表特定生产者组的流名称的 ReplaySubject 永远不会再被冷处理,如果没有该名称的当前生产者,则不会被处置,或者像以前一样使用不同的参数重新实例化然后,该流的注册消费者将持有不正确的引用。因此,消费者需要对特定名称的活动流的响应式更新、声明性聚合的引用。
代码
const publishStream$ = new Subject();
function publishStream(name, stream) {
// publish streams as connectables to ensure a single source of truth
// (i.e., don't duplicate server requests)
publishStream$.next({ name, stream: stream.publish().refCount() });
}
const availableStreams$ = publishStream$
.startWith([])
.scan((streams, stream) => {
console.log('\nadding stream:', stream.name)
// (this would eventually be state operations to add/remove streams
// so this doesn't just keep growing)
return streams.concat(stream)
})
.publish();
availableStreams$.connect()
function getStream(name) {
const relevantStreams$ = availableStreams$
.do(res => console.log(`\nstream recalculates here for ${name}`))
.map(streams => streams.filter(streamObj => streamObj.name === name))
.filter(streams => streams.length)
.map(streamObjs => streamObjs.map(streamObj => streamObj.stream))
.map(pluckActiveStreams)
.switchMap(streams => Observable.merge(...streams))
// add caching for late subscribers
// per https://github.com/ReactiveX/rxjs/issues/1541?
// (none of the approaches below work):
//.multicast(() => new ReplaySubject()).refCount()
// .publishReplay(1).refCount()
//.share().repeat(1).publish()
//.cache(1)
return relevantStreams$;
}
function pluckActiveStreams(streams) {
const activeStreams = new Subject();
const elements = [];
const subscriptions = [];
streams.forEach(stream => {
var include = true;
const subscription = stream.subscribe(x => {
console.log('value', x)
}, x => {
console.log('error:', x)
}, x => {
console.log('completed', x)
include = false;
const i = elements.indexOf(stream);
if (i > -1) {
elements.splice(i, 1);
activeStreams.next(elements.slice());
}
});
if (include) {
elements.push(stream);
subscriptions.push(subscription);
}
});
activeStreams.next(elements.slice());
const pluckedStreams = Observable.using(
() => new Subscription(() => subscriptions.forEach(x => x.unsubscribe())),
() => activeStreams
);
return pluckedStreams;
}
测试
var acc1 = [];
var acc2 = [];
// all streams published get publish().refCount() internally
var obs1 = Observable.of(1, 2, 3, 4);
var obs2 = Observable.empty();
var obs3 = Observable.of(5, 6, 7, 8);
// consumer before publish - will receive values
var sub1 = dp.getStream('foo').subscribe(function (i) {
console.log('subscription receives: ', i)
acc1.push(i);
});
var pub1 = dp.publishStream('foo', obs1);
console.log('end of publishStream1')
var pub2 = dp.publishStream('foo', obs2);
console.log('end of publishStream2')
var pub3 = dp.publishStream('foo', obs3);
console.log('end of publishStream3')
// consumer after publish - should receive last cached value
// from active aggregated stream
var sub3 = dp.getStream('foo').subscribe(function (i) {
console.log("\ncached value received (I also don't fire :( ", i)
acc2.push(i);
});
var i = setTimeout(function () {
expect(acc1).to.deep.equal([1, 2, 3, 4, 5, 6, 7, 8]);
expect(acc2).to.deep.equal([8]);
done();
}, 10);