我在使用 RxJS 和处理请求数组的正确方法时遇到问题。假设我有一个大约 50 个请求的数组,如下所示:
let requestCounter = 0;
function makeRequest(timeToDelay) {
return of('Request Complete!').pipe(delay(timeToDelay));
}
const requestArray = []
for(let i=0;i<25;i++){
requestArray.push(makeRequest(3000)); //3 seconds request
requestArray.push(makeRequest(1000)); //1 second request
}
我的目标是:
- 并行启动请求
- 只有5个可以同时运行
- 当一个请求完成时,数组中的下一个开始
- 当请求完成(成功或错误)时,我需要将变量“requestCounter”加一(requestCounter++)
- 当队列中的最后一个请求完成后,我需要订阅此事件并处理每个请求结果的数组
到目前为止,我最接近的做法是遵循这篇文章中的回复:
问题是我正在发现 RxJS,而这个例子对我来说太复杂了,我找不到如何处理每个请求的计数器。
希望您能够帮助我。(抱歉英语不好,这不是我的母语)
编辑:最终解决方案如下所示:
forkJoinConcurrent<T>(
observables: Observable<T>[],
concurrent: number
): Observable<T[]> {
return from(observables).pipe(
mergeMap((outerValue, outerIndex) => outerValue.pipe(
tap(// my code ),
last(),
catchError(error => of(error)),
map((innerValue, innerIndex) => ({index: outerIndex, value: innerValue})),
), concurrent),
toArray(),
map(a => (a.sort((l, r) => l.index - r.index).map(e => e.value))),
);
}