1

使用 RxJS 5 我想解决以下问题:

假设我从 REST API 中获取了一个类别列表。

基于这些类别中的每一个,我想从另一个 REST 端点获取子类别。

然后,基于这些子类别中的每一个,我想获取产品,并且对于这些产品中的每一个,我们都需要获取详细描述。

这个我已经解决了。问题是 ajax 调用升级,不到一分钟就进行了超过 30k 的调用,导致服务器瘫痪。

现在,由于这是一项夜间工作,只要它成功完成,我就可以接受它需要一些时间。

这就是我所拥有的:

getCategories() // Wraps ajax call and returns payload with array of categories
      .switchMap(categories => Observable.from(categories))
      .mergeMap(category => 
        getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
          .catch(err => {
            console.error('Error when fetching sub categories for category:', category);
            console.error(err);
            return Observable.empty();
          })
      )
      .mergeMap(subCategories => Observable.from(subCategories))
      .mergeMap(subCategory => 
        getProducts(subCategory) // Wraps ajax call and returns payload with array of products
        .catch(err => {
          console.error('Error when fetching products for sub category:', subCategory);
          console.error(err);
          return Observable.empty();
        })
      )
      .mergeMap(products => Observable.from(products))
      .mergeMap(product => 
        getProductDetails(product) // Wraps ajax call and returns payload with product details
        .catch(err => {
          console.error('Error when fetching product details for product:', product);
          console.error(err);
          return Observable.empty();
        })
      )
      .mergeMap(productDetails => saveToDb(productDetails))
      .catch(err => {
        console.error(err);
      })
      .subscribe();

在我获取类别的初始请求之后,我想:

每个获取子类别的调用都应该等到前一个调用完成。获取产品和产品详细信息时,一次只能进行 5 次 ajax 调用。在这 5 个调用完成后,我们触发接下来的 5 个调用,以此类推。

或者,它可以通过时间来控制,例如在我们进行下一个 ajax 调用之前等待 x 秒等。

根据上面的示例,我将如何使用 RxJS 很好地解决这个问题?

4

1 回答 1

3

mergeMap有一个重载,它采用可选的并发参数。您可以利用它来控制同时向您的服务器发送多少请求。

public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

  .mergeMap(category => 
    getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
      .catch(err => {
        console.error('Error when fetching sub categories for category:', category);
        console.error(err);
        return Observable.empty();
      }),
    null, 
    5 /* concurrency */
  )

还; mergeMap 会将任何 observableLike 转换为 Observables,因此如果您getSubCategories()返回一个将自动转换为 observable 的 Array,则无需进一步Observable.from()

于 2018-01-22T10:32:11.450 回答