5

我有一个用例,我需要限制传出 http 请求的数量。是的,我在服务器端确实有速率限制器,但前端也需要对活动 http 请求的数量进行限制。因此,我正在尝试实现一个滑动窗口协议,在任何时候我都会只有 n 个活动请求。

这种使用 Rxjs 的方法通常可以正常工作,请参见此处: https ://jsbin.com/pacicubeci/1/edit?js,console,output

但我不清楚如何对 http 拦截器使用相同的逻辑。我在下面的尝试在编译时失败,并出现以下错误:

类型“订阅”缺少类型“可观察<HttpEvent>”中的以下属性:_isScalar、源、运算符、提升和 114 多个。(2740)

有了这个,我怎样才能返回一个 observable 并同时在 http 拦截器上维护一个队列?我的方法有缺陷吗?我可以使用http拦截器来限制http速率吗?

@Injectable()
export class I1 implements HttpInterceptor {
  intercept(
    req: HttpRequest<any>,
    next: HttpHandler
  ): Observable<HttpEvent<any>> {
    const modified = req.clone({ setHeaders: { "Custom-Header-1": "1" } });

    return next
      .handle(req)
      .do((ev: HttpEvent<any>) => {
        if (ev instanceof HttpResponse) {
          console.log(ev);
        }
      })
      .pipe(
        bufferTime(1000, null, 1),
        filter(buffer => buffer.length > 0),
        concatMap(buffer => of(buffer).pipe(delay(1000)))
      )
      .subscribe(console.log);
      }
    }

https://stackblitz.com/edit/angular-interceptors-npqkjp?file=app/interceptors.ts

4

2 回答 2

4

如果您想了解更多关于拦截器和 HttpClientModule 如何在后台工作的信息,可以查看这篇文章:在 Angular 中探索 HttpClientModule

我的方法有缺陷吗?在这种情况下,问题在于next.handle预期会返回一个 Observable,但通过订阅它,它会返回一个 Subscription。

为了更好地理解原因,我将粘贴从上面链接的文章中复制的片段:

const obsBE$ = new Observable(obs => {
  timer(1000)
    .subscribe(() => {
      // console.log('%c [OBSERVABLE]', 'color: red;');

      obs.next({ response: { data: ['foo', 'bar'] } });

      // Stop receiving values!
      obs.complete();
    })

    return () => {
      console.warn("I've had enough values!");
    }
});

// Composing interceptors the chain
const obsI1$ = obsBE$
  .pipe(
    tap(() => console.log('%c [i1]', 'color: blue;')),
    map(r => ({ ...r, i1: 'intercepted by i1!' }))
  );

let retryCnt = 0;
const obsI2$ = obsI1$
  .pipe(
    tap(() => console.log('%c [i2]', 'color: green;')),
    map(r => { 
      if (++retryCnt <=3) {
        throw new Error('err!') 
      }

      return r;
    }),
    catchError((err, caught) => {
      return getRefreshToken()
        .pipe(
          switchMap(() => /* obsI2$ */caught),
        )
    })
  );

const obsI3$ = obsI2$
  .pipe(
    tap(() => console.log('%c [i3]', 'color: orange;')),
    map(r => ({ ...r, i3: 'intercepted by i3!' }))
  );

function getRefreshToken () {
  return timer(1500)
    .pipe(q
      map(() => ({ token: 'TOKEN HERE' })),
    );
}

function get () {
  return obsI3$
}

get()
  .subscribe(console.log)

/* 
-->
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
[i3]
{
  "response": {
    "data": [
      "foo",
      "bar"
    ]
  },
  "i1": "intercepted by i1!",
  "i3": "intercepted by i3!"
}
I've had enough values!
*/

StackBlitz 演示。

要点是拦截器创建某种,该链以负责发出实际请求的可观察对象结束。是链中的最后一个节点:

return new Observable((observer: Observer<HttpEvent<any>>) => {
  // Start by setting up the XHR object with request method, URL, and withCredentials flag.
  const xhr = this.xhrFactory.build();
  xhr.open(req.method, req.urlWithParams);
  if (!!req.withCredentials) {
    xhr.withCredentials = true;
  }
  /* ... */
})

如何在 http 拦截器上返回一个 observable 并同时维护一个队列

我认为解决此问题的一种方法是创建一个包含队列逻辑并使其intercept方法返回的拦截器Observable,以便可以订阅它:

const queueSubject = new Subject<Observable>();

const pendingQueue$ = queueSubject.pipe(
  // using `mergeAll` because the Subject's `values` are Observables
  mergeAll(limit),
  share(),
);

intercept (req, next) {
  // `next.handle(req)` - it's fine to do this, no request will fire until the observable is subscribed
  queueSubject.next(
    next.handle(req)
      .pipe(
        // not interested in `Sent` events
        filter(ev => ev instanceof HttpResponse),

        filter(resp => resp.url === req.url),
      )
  );

  return pendingQueue$;
}

使用filter运算符是因为通过 using share,响应将发送给所有订阅者。想象一下,你同步调用http.get了 5 次,所以 5 个新订阅者为share' 的主题,最后一个将收到它的响应,但其他请求的响应也是如此。因此使用可以filter为请求提供正确的响应,在这种情况下,通过将 request( req.url) 的 URL 与我们从 获得的 URL进行比较HttpResponse.url

observer.next(new HttpResponse({
  body,
  headers,
  status,
  statusText,
  url: url || undefined,
}));

上述代码段的链接


现在,我们为什么使用share()?

我们先看一个更简单的例子:

const s = new Subject();

const queue$ = s.pipe(
  mergeAll()
)

function intercept (req) {
  s.next(of(req));
  
  return queue$
}

// making request 1
intercept({ url: 'req 1' }).subscribe();

// making request 2
intercept({ url: 'req 2' }).subscribe();

// making request 3
intercept({ url: 'req 3' }).subscribe();

此时,主题s应该有 3 个订阅者。这是因为当您返回队列时,您会返回s.pipe(...),当您订阅它时,它与执行以下操作相同:

s.pipe(/* ... */).subscribe()

所以,这就是主题最后会有 3 个订阅者的原因。

现在让我们检查相同的代码段,但使用share()

const queue$ = s.pipe(
  mergeAll(),
  share()
);

// making request 1
intercept({ url: 'req 1' }).subscribe();

// making request 2
intercept({ url: 'req 2' }).subscribe();

// making request 3
intercept({ url: 'req 3' }).subscribe();

订阅请求 1 后,share将创建一个 Subject 实例,所有后续订阅者都将属于它,而不是属于Subject s。因此,s将只有一个订阅者。这将确保我们正确实现队列,因为尽管 Subjects只有一个订阅者,但它仍然会接受s.next()值,其结果将传递给另一个主题(来自 的那个share()),最终将发送响应给它的所有订阅者。

于 2021-01-19T13:52:22.593 回答
1

在您的拦截器上,您返回的是订阅,而不是 Observable。

如果您删除该行.subscribe(console.log),它应该编译得很好。订阅由消费者完成。

如果要控制台记录所有发出的内容,请使用tap(next => ...)运算符

编辑 - 嗯,它解决了编译错误,但我不确定它会按你的意愿工作......我不完全理解拦截器是如何工作的。

于 2021-01-19T10:16:34.060 回答