2

我试图在我的服务调用中使用重试模式(实际上是:ngrx/store 中的@Effects)并增加延迟间隔。由于我设法为一个调用提供了一个工作代码(即使它看起来没有优化我不想在我的问题中关注这个),我现在想将它提取到一个自定义的 Observable 运算符中,并重复使用它在我所有的服务电话中。

对于如何为新运算符设计 API/用法以及如何使其被 TypeScript 识别,我一无所知。

下面的代码肯定行不通,因为它可能会积累大量问题。

所以,现在我有一个呼叫/效果如下:

  @Effect()
  loadData$: Observable<Action> = this.actions$
    .ofType(ActionTypes.LOAD_DATA)
    .pluck('payload')
    .switchMap(params => {
      return this.myService.getData(params)
        .map(res => new LoadDataCompleteAction(res))

        // ...and this part would have to be extracted: 
        .retryWhen(attempts => Observable
          .zip(attempts, Observable.range(1, 5))
          .flatMap((n, i) => {
            if (i < 4) {
              return Observable.timer(1000 * i);
            } else {
              throw(n);
            }
          })
        )
    })
    .catch(err => Observable.of(new LoadDataFailed()));

而我所追求的是能够在其他效果中重用重试部分,并且具有类似于以下的模式:

  @Effect()
  loadData$: Observable<Action> = this.actions$
    .ofType(ActionTypes.LOAD_DATA)
    .pluck('payload')
    .switchMap(params => {
      return this.myService.getData(params)
        .map(res => new LoadDataCompleteAction(res))
        .retryWhen(attempts => Observable.retryOrThrow(attempts, maxAttempts)

         // or maybe - that's my design question
         .retryOrThrow(attempts, maxAttempts)
    })
    .catch(err => Observable.of(new LoadDataFailed()));

为简单起见,我们可以假设延迟回调模式 ( i * 1000) 对于整个应用程序是恒定的。

下面的代码是我的尝试,但它显然不起作用。

declare module 'rxjs/Observable' {
  interface Observable<T> {
    retryOrThrow<T>(attempts: any, max: number): Observable<T>;
  }     
}

Observable.prototype.retryOrThrow = function(attempt, max) {
  console.log('retryOrThrow called');

  return Observable.create(subscriber => {
    const source = this;
    const subscription = source.subscribe(() => {
        // important: catch errors from user-provided callbacks
        try {
          subscriber
            .zip(attempt, Observable.range(1, max + 1))
            .flatMap((n, i) => {
              console.log(n, i);
              if (i < max) {
                return Observable.timer(1000 * i);
              } else {
                throw(n);
              }
            });
        } catch (err) {
          subscriber.error(err);
        }
      },
      // be sure to handle errors and completions as appropriate and send them along
      err => subscriber.error(err),
      () => subscriber.complete());

    // to return now
    return subscription;
  });
};
  1. 我不确定如何为 new 运算符设计 API,什么语法最适合这里。
  2. 我不知道如何正确声明 new 运算符和 Observable 命名空间或模块,以便 TypeScript 识别新内容。

更新的服务电话:

  getMocky(){
    const u = Math.random();

    const okUrl = 'http://www.mocky.io/v2/58ffadf71100009b17f60044';
    const erUrl = 'http://www.mocky.io/v2/58ffae7f110000ba17f60046';
    return u > 0.6 ? this.http.get(okUrl) : this.http.get(erUrl);
  }
4

1 回答 1

4

我无法直接回答您的问题,因为我不知道如何使用自定义运算符扩展 rxjs。

幸运的是,你(和我)不需要知道。您真正要问的是如何预先定义一系列运算符以在多个地方重用。

您所需要的只是let-Operator,它使用起来非常简单。

首先将要重用的逻辑提取到返回可观察对象的函数中:

function retryOrThrow(maxAttempts, timeout) {
  return (source) =>
    source
      .retryWhen(attempts => Observable
        .zip(attempts, Observable.range(1, maxAttempts + 1))
        .flatMap((n, i) => {
          if (i < maxAttempts) {
            return Observable.timer(timeout * i);
          } else {
            throw (n);
          }
        })
      );
}

使用效果中的功能let

@Effect()
loadData$: Observable<Action> = this.actions$
  .ofType(ActionTypes.LOAD_DATA)
  .pluck('payload')
  .switchMap(params => {
    return this.myService.getData(params)
      .map(res => new LoadDataCompleteAction(res))
      // inject the pre-defined logic with the desired parameters
      .let(retryOrThrow(5, 1000))
    })
    .catch(err => Observable.of(new LoadDataFailed()));

了解它的最简单方法let是查看它的源代码。这真的很简单,因为它所做的只是将给定的函数应用于源可观察对象。

于 2017-04-23T21:31:00.823 回答