0
  • 我想要一个 pubsub 系统,它有流的生产者和消费者,通过数据平面层但没有主题。
  • 许多生产者可以多播到相同的流名称(例如,'filters.add'),并且多个消费者可以订阅该流。
  • 生产者注册名称和流以创建“已发布流的流”。
  • 如果一个新的生产者注册了一个特定的流名称,流向消费者的流会动态更新,以便接收到的数据是所有当前活动的已发布流与请求的名称的合并。

  • 一个生产者流完成不应导致聚合流完成,而是在发生这种情况时被排除(更新所有消费者)。

  • 在生产者注册后获得流的消费者应该收到该流的最后一个发出的值(如果迄今为止有一个)。

这是我到目前为止的代码和我希望满足的测试。我可以满足基本情况但不能缓存,因为:

  • 已完成的流被排除在外,即使它们可能已经发出了最新的值;
  • .cache(1)afterstream.publish().refCount()重复4从第一个输入Observable,而不是8从第二个输入;
  • 即使pluckActiveStreams注释掉了,这也不起作用,我只是无法让它与任何注释掉的方法一起工作......

我的问题是:

  • 如何实现聚合流的缓存?注释掉的最后几行都没有relevantStreams$做到这一点。
  • 我认为我需要从收到的聚合流中删除已完成的流,以便聚合流本身不会(永远)完成。这是(可能)使用 完成的pluckActiveStreams,它基于此处给出的解决方案,但仍为迟到的消费者提供流的最后发出的值,即使它可能来自现已完成的流?

我当前的实现已经在内部使用 ReplaySubjects 的 observables 作为生产者和消费者之间的“桥梁”成功地实现了这一点。但是这样:

  1. 所有生产者流都会自动变热;
  2. ReplaySubject 需要一个 hack 来确保它永远不会在任何多播到它的流中完成(否则它会关闭所有消费者的流,如此所述);
  3. 一旦在第一个生产者注册时创建,代表特定生产者组的流名称的 ReplaySubject 永远不会再被冷处理,如果没有该名称的当前生产者,则不会被处置,或者像以前一样使用不同的参数重新实例化然后,该流的注册消费者将持有不正确的引用。因此,消费者需要对特定名称的活动流的响应式更新、声明性聚合的引用。

代码

const publishStream$ = new Subject();

function publishStream(name, stream) {
    // publish streams as connectables to ensure a single source of truth
    // (i.e., don't duplicate server requests)
    publishStream$.next({ name, stream: stream.publish().refCount() });
}

const availableStreams$ = publishStream$
    .startWith([])
    .scan((streams, stream) => {
        console.log('\nadding stream:', stream.name)
        // (this would eventually be state operations to add/remove streams
        // so this doesn't just keep growing)
        return streams.concat(stream)
    })
    .publish();
availableStreams$.connect()

function getStream(name) {

    const relevantStreams$ = availableStreams$
        .do(res => console.log(`\nstream recalculates here for ${name}`))
        .map(streams => streams.filter(streamObj => streamObj.name === name))
        .filter(streams => streams.length)
        .map(streamObjs => streamObjs.map(streamObj => streamObj.stream))
        .map(pluckActiveStreams)
        .switchMap(streams => Observable.merge(...streams))
        // add caching for late subscribers 
        // per https://github.com/ReactiveX/rxjs/issues/1541?
        // (none of the approaches below work):
        //.multicast(() => new ReplaySubject()).refCount()
        // .publishReplay(1).refCount()
        //.share().repeat(1).publish()
       //.cache(1)

    return relevantStreams$;

}


function pluckActiveStreams(streams) {
    const activeStreams = new Subject();
    const elements = [];
    const subscriptions = [];

    streams.forEach(stream => {
        var include = true;
        const subscription = stream.subscribe(x => {
            console.log('value', x)
        }, x => {
            console.log('error:', x)
        }, x => {
            console.log('completed', x)
            include = false;
            const i = elements.indexOf(stream);
            if (i > -1) {
                elements.splice(i, 1);
                activeStreams.next(elements.slice());
            }
        });
        if (include) {
            elements.push(stream);
            subscriptions.push(subscription);
        }
    });

    activeStreams.next(elements.slice());

    const pluckedStreams = Observable.using(
        () => new Subscription(() => subscriptions.forEach(x => x.unsubscribe())),
        () => activeStreams
    );
    return pluckedStreams;
}

测试

        var acc1 = [];
        var acc2 = [];

        // all streams published get publish().refCount() internally
        var obs1 = Observable.of(1, 2, 3, 4);
        var obs2 = Observable.empty();
        var obs3 = Observable.of(5, 6, 7, 8);

        // consumer before publish - will receive values
        var sub1 = dp.getStream('foo').subscribe(function (i) {
            console.log('subscription receives: ', i)
            acc1.push(i);
        });

        var pub1 = dp.publishStream('foo', obs1);
        console.log('end of publishStream1')
        var pub2 = dp.publishStream('foo', obs2);
        console.log('end of publishStream2')
        var pub3 = dp.publishStream('foo', obs3);
        console.log('end of publishStream3')

        // consumer after publish - should receive last cached value
        // from active aggregated stream
        var sub3 = dp.getStream('foo').subscribe(function (i) {
            console.log("\ncached value received (I also don't fire :( ", i)
            acc2.push(i);
        });

        var i = setTimeout(function () {
            expect(acc1).to.deep.equal([1, 2, 3, 4, 5, 6, 7, 8]);
            expect(acc2).to.deep.equal([8]);
            done();
        }, 10);
4

0 回答 0