1

我正在使用速率限制器来限制路由的请求数量

请求被发送到一个频道,我想限制每秒处理的数量,但我很难理解我是否设置正确,我没有收到错误,但我不确定我是否我什至在使用速率限制器

这是添加到频道的内容:

type processItem struct {
    itemString string
}

这是通道和限制器:

itemChannel := make(chan processItem, 5)
itemThrottler := rate.NewLimiter(4, 1) //4 a second, no more per second (1)
var waitGroup sync.WaitGroup

项目被添加到频道:

case "newItem":
    waitGroup.Add(1)
    itemToExec := new(processItem)
    itemToExec.itemString = "item string"
    itemChannel <- *itemToExec

然后使用 goroutine 处理添加到通道中的所有内容:

go func() {
    defer waitGroup.Done()
    err := itemThrottler.Wait(context.Background())
    if err != nil {
        fmt.Printf("Error with limiter: %s", err)
        return
    }
    for item := range itemChannel {
        execItem(item.itemString) // the processing function
    }
    defer func() { <-itemChannel }()
}()
waitGroup.Wait()

有人可以确认发生以下情况:

  • execItem 函数在通道的每个成员上每秒运行 4 次

我不明白“err := itemThrottler.Wait(context.Background())”在代码中做了什么,这是如何调用的?

4

1 回答 1

2

...我不确定我是否在使用速率限制器

是的,您正在使用速率限制器。您正在限制case "newItem":代码分支的速率。

我不明白代码中的“err := itemThrottler.Wait(context.Background())”在做什么

itemThrottler.Wait(..)只会错开请求(4/s 即每 0.25 秒) -如果超过速率,它不会拒绝请求。那么这是什么意思?如果您在 1 秒内收到过多的 1000 个请求:

  • 4个请求将立即处理;但
  • 996 个请求将创建 996 个 go-routines 的积压,这些 go-routines 将阻塞

996 将以 4/s 的速率解除阻塞,因此待处理的 go-routines 的积压将在另外 4 分钟内不会清除(如果有更多请求进来,可能会更长时间)。积压的 go-routines 可能是也可能不是你想要的。如果没有,您可能需要使用Limiter.Allow - 如果false返回,则拒绝请求(即不创建 go-routine)并返回429错误(如果这是 HTTP 请求)。

最后,如果这是一个 HTTP 请求,你应该在调用时使用它的嵌入上下文,Wait例如

func (a *app) myHandler(w http.ResponseWriter, r *http.Request) {
    // ...

    err := a.ratelimiter(r.Context())

    if err != nil {
        // client http request most likely canceled (i.e. caller disconnected)
    }
}
于 2020-06-29T11:53:11.913 回答