我有一个从两个流合并的源流。当源流发出事件时,我想调用订阅函数Meteor.subscribe
并保持打开状态,所以我使用mergeMap
. 当订阅准备好时,我通过管道连接到另一个mergeMap
来填充数据。它运行良好,直到我点击 100 次并且内存消耗猛增。问题是,如何限制mergeMap,不是限制在前N 个订阅concurrent: Number
,而是限制到最近的N 个订阅,就像滑动窗口一样?
function paginationCache$(): Observable<any> {
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('my/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
});
})
);
}
我想更详细地解释该代码中发生的事情。
在我的示例中,只要我单击 Web 界面中的按钮,源流(前管道)就永远不会完成,因此当我单击界面中的下一个或上一个按钮时它会发出更改。首先从源流中获取更改并将它们发送到后端 API(也有冲突的命名发布/订阅)。因此,当客户端上的数据可用时,我调用移动到第二个,但我无法销毁或停止流星的订阅。两个原因:1.我想实时更改所选数据,2.如果我停止流星的订阅,客户端的数据将被删除。因此,如果它在服务器上更新,那么现在它会不断更新选定的数据。merge
mergeMap
observer.next(subscription)
mergeMap
mergeMap
因此,在每个 UI 按钮单击(下一个、上一个)之后,我都有新的订阅链。如果原始数据表不大(1000 条记录)并且我只是单击了几次,那没关系。但是,我可以拥有超过 30000 个,并且可以多次单击我的按钮。
所以,我们的想法是让 mergeMap 像一个大小有限的队列,只保存最后 N 个订阅,但是当我单击按钮时,队列一直在变化。
最后编辑:工作代码:
function paginationCache$(): Observable<any> {
const N = 3;
const subscriptionsSubject = new Subject();
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
subscriptionsSubject.next();
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
}).pipe(
takeUntil(subscriptionsSubject
.pipe(
take(N),
filter((_, idx) => idx === N - 1)
)
)
);
})
);
}