0

我在使用 RxJS 和处理请求数组的正确方法时遇到问题。假设我有一个大约 50 个请求的数组,如下所示:

let requestCounter = 0;
function makeRequest(timeToDelay) {
  return of('Request Complete!').pipe(delay(timeToDelay));
}

const requestArray = []
for(let i=0;i<25;i++){
  requestArray.push(makeRequest(3000)); //3 seconds request
  requestArray.push(makeRequest(1000)); //1 second request
}

我的目标是:

  • 并行启动请求
  • 只有5个可以同时运行
  • 当一个请求完成时,数组中的下一个开始
  • 当请求完成(成功或错误)时,我需要将变量“requestCounter”加一(requestCounter++)
  • 当队列中的最后一个请求完成后,我需要订阅此事件并处理每个请求结果的数组

到目前为止,我最接近的做法是遵循这篇文章中的回复:

具有并发工作者的 RxJS 并行队列?

问题是我正在发现 RxJS,而这个例子对我来说太复杂了,我找不到如何处理每个请求的计数器。

希望您能够帮助我。(抱歉英语不好,这不是我的母语)

编辑:最终解决方案如下所示:

forkJoinConcurrent<T>(
    observables: Observable<T>[],
    concurrent: number
  ): Observable<T[]> {
    return from(observables).pipe(
      mergeMap((outerValue, outerIndex) => outerValue.pipe(
        tap(// my code ),
        last(),
        catchError(error => of(error)),
        map((innerValue, innerIndex) => ({index: outerIndex, value: innerValue})),
      ), concurrent),
      toArray(),
      map(a => (a.sort((l, r) => l.index - r.index).map(e => e.value))),
    );
  }
4

1 回答 1

0

首先你应该使用 subject 来存储请求队列,然后看看mergeMapoperator,concurrency你可以设置一个参数来设置最大并发以及一个index变量来跟踪调用次数

https://www.learnrxjs.io/operators/transformation/mergemap.html

const requestArray=new Subject()
for(let i=0;i<25;i++){
  requestArray.next(makeRequest(3000)); //3 seconds request
  requestArray.next(makeRequest(1000)); //1 second request
}

requestArray.pipe(
mergeMap((res,index)=>of([res,index]),
res=>res,5),
map((res,index)=>{if(index===25) .... do your thing ; return res;})
).subscribe(console.log)
于 2019-07-01T01:39:58.723 回答