2

我有一个从两个流合并的源流。当源流发出事件时,我想调用订阅函数Meteor.subscribe并保持打开状态,所以我使用mergeMap. 当订阅准备好时,我通过管道连接到另一个mergeMap来填充数据。它运行良好,直到我点击 100 次并且内存消耗猛增。问题是,如何限制mergeMap,不是限制在前N 个订阅concurrent: Number,而是限制到最近的N 个订阅,就像滑动窗口一样?

function paginationCache$(): Observable<any> {

    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes
                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('my/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);

                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                });

            })
        );
}

我想更详细地解释该代码中发生的事情。

在我的示例中,只要我单击 Web 界面中的按钮,源流(前管道)就永远不会完成,因此当我单击界面中的下一个或上一个按钮时它会发出更改。首先从源流中获取更改并将它们发送到后端 API(也有冲突的命名发布/订阅)。因此,当客户端上的数据可用时,我调用移动到第二个,但我无法销毁或停止流星的订阅。两个原因:1.我想实时更改所选数据,2.如果我停止流星的订阅,客户端的数据将被删除。因此,如果它在服务器上更新,那么现在它会不断更新选定的数据merge mergeMapobserver.next(subscription)mergeMap mergeMap

因此,在每个 UI 按钮单击(下一个、上一个)之后,我都有新的订阅链。如果原始数据表不大(1000 条记录)并且我只是单击了几次,那没关系。但是,我可以拥有超过 30000 个,并且可以多次单击我的按钮。

所以,我们的想法是让 mergeMap 像一个大小有限的队列,只保存最后 N 个订阅,但是当我单击按钮时,队列一直在变化。

最后编辑:工作代码:

function paginationCache$(): Observable<any> {
    const N = 3;
    const subscriptionsSubject = new Subject();
    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes

                subscriptionsSubject.next();

                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);
                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                }).pipe(
                    takeUntil(subscriptionsSubject
                        .pipe(
                            take(N),
                            filter((_, idx) => idx === N - 1)
                        )
                    )
                );
            })
        );
}

4

2 回答 2

1

在不考虑您的代码段的情况下,这就是我的处理方式:

不是并发的前 N ​​个订阅:数量,而是最近的 N 个,就像一个滑动窗口

如果我理解正确,你会想要这样的东西(假设N = 3):

N = 3

Crt             |  1 |  2 |  3 |
Subscriptions   | S1 | S2 | S3 |


When Crt = 4

Crt           | 2  | 3  |  4 |
Subscriptions | S2 | S3 | S4 |

如果是这种情况,这就是我的解决方法:

const subscriptionsSubject = new Subject();

src$.pipe(
  mergeMap(
    data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
      .pipe(
        takeUntil(
          subscriptionsSubject.pipe(
            take(N), // After `N` subscriptions, it will complete
            filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
          )
        )
      )
  )
)
于 2020-04-21T10:26:08.137 回答
0

我这里有两个想法:

  1. 你没有完成第二个内部 Observable。我想这不应该是您问题的根源,但如果可以的话,最好完成观察者:

    return () => {
      subscription.stop();
      observer.complete();
    };
    
  2. 您可以使用bufferCount创建 Observables 的滑动窗口,然后使用 订阅它们switchMap()。这些方面的东西:

    import { of, range } from 'rxjs'; 
    import { map, bufferCount, switchMap, shareReplay, tap } from 'rxjs/operators';
    
    range(10)
      .pipe(
        // turn each value to an Observable
        // `refCount` is used so that when `switchMap` switches to a new window
        // it won't trigger resubscribe to its sources and make more requests.
        map(v => of(v).pipe(shareReplay({ refCount: false, bufferSize: 1 }))),
        bufferCount(3, 1),
        tap(console.log), // for debugging purposes only
        switchMap(sourcesArray => merge(...sourcesArray)),
      )
      .subscribe(console.log);
    

    现场演示:https ://stackblitz.com/edit/rxjs-kuybbs?devtoolsheight=60

    我不完全确定这会模拟您的用例,但我也尝试包括在内shareReplay,这样它就不会触发Meteor.subscribe对同一个 Observable 的多次调用。我必须对您的代码进行工作演示才能自己进行测试。

于 2020-04-21T18:03:44.163 回答