2

我有一个对象数组。对于每个对象,我需要触发一个异步请求(http 调用)。但我只想同时运行一定数量的请求。此外,如果我可以在所有请求完成后有一个同步点来执行某些代码,那将是很好的(但不是必需的)。

我尝试过以下建议:

使用 RxJS 一次限制请求数

如何限制flatMap的并发?

并行触发异步请求,但使用 rxjs 按顺序获取结果

还有更多......我什至尝试制作自己的运营商。

要么这些页面上的答案太旧而无法使用我的代码,要么我无法弄清楚如何将所有内容放在一起,以便所有类型都能很好地适应。

这是我到目前为止所拥有的:

for (const obj of objects) {
  this.myService.updateObject(obj).subscribe(value => {
    this.anotherService.set(obj);
  });
}

编辑1: 好的,我想我们到了!随着Juliuspschild的回答(两者似乎同样有效),我设法限制了请求的数量。但现在它只会发射第一批 4 个,而不会发射其余的。所以现在我有:

const concurrentRequests = 4;
from(objects)
  .pipe(
    mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
    tap(result => this.anotherService.set(result))
  ).subscribe();

我做错了subscribe()什么吗?

顺便说一句:mergeMapwithresultSelector参数已被弃用,所以我mergeMap没有使用它。此外, 的objmergeMap不可见tap,所以我不得不使用tap' 参数

编辑2:

确保你的观察者完成!(花了我一整天)

4

3 回答 3

3

您可以使用第三个参数mergeMap来限制并发内部订阅的数量。用于finalize在所有请求完成后执行某些操作:

const concurrentRequests = 5;
from(objects)
    .pipe(
        mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
        tap(res => this.anotherService.set(res))),
        finalize(() => console.log('Sequence complete'))
    );

请参阅Stackblitz上的示例。

于 2019-05-18T09:02:10.297 回答
2
from(objects).pipe(
  bufferCount(10),
  concatMap(objs => forkJoin(objs.map(obj => 
    this.myService.updateObject(obj).pipe(
      tap(value => this.anotherService.set(obj))
  )))),
  finalize(() => console.log('all requests are done'))
)

代码没有经过测试,但你明白了。让我知道是否需要任何错误或解释

于 2019-05-18T08:49:41.200 回答
0

我曾经遇到过同样的问题。当我尝试从服务器加载多个图像时。我不得不一个接一个地发送http请求。我使用等待的承诺达到了预期的结果。这是示例代码:

async ngOnInit() {
    for (const number of this.numbers) {
      await new Promise(resolve => {
        this.http.get(`https://jsonplaceholder.typicode.com/todos/${number}`).subscribe(
          data => {
            this.responses.push(data);
            console.log(data);
            resolve();
          }
        );
      });
    }
  }

主要想法是在您得到响应后解决承诺。使用这种技术,您可以提出自定义逻辑,以便在所有请求完成后执行一个方法。

这是堆栈闪电战。打开控制台以查看它的运行情况。:)

于 2019-05-18T08:46:07.820 回答