2

我需要对 API 的请求进行速率限制,并且我正在考虑golang.org/x/time/rate为此目的使用本机包。为了稍微摆弄它的 API 并确保我的假设是正确的,我创建了这个测试,但似乎我在这里遗漏了一些东西:

package main

import (
    "github.com/stretchr/testify/require"
    "golang.org/x/time/rate"
    "sync"
    "testing"
)

func TestLimiter(t *testing.T) {
    limiter := rate.NewLimiter(rate.Limit(5),1)
    wg := sync.WaitGroup{}
    successful := 0

    for i:=1; i<=10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            if limiter.Allow() {
                successful++
            }
        }()
    }
    wg.Wait()

    require.Equal(t, 5, successful)
    
    // This test fails with
    // Expected :5
    // Actual   :1
}

有人可以解释一下为什么会这样吗?速率限制器不应该允许 5 req/s 吗?

4

1 回答 1

3

首先,你有一个数据竞赛。多个 goroutine 写入successful时没有同步:未定义的行为。

您可以使用该sync/atomic软件包进行简单安全的计数:

limiter := rate.NewLimiter(rate.Limit(5), 1)
wg := sync.WaitGroup{}
successful := int32(0)

for i := 1; i <= 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        if limiter.Allow() {
            atomic.AddInt32(&successful, 1)
        }
    }()
}
wg.Wait()

fmt.Println(successful)

这将输出:

1

为什么?因为您每秒允许 5 个事件,所以每 0.2 秒有 1 个事件。启动 10 个 goroutine 并检查将花费不到 0.2 秒,因此只允许一个事件。

如果您在循环中添加 200 ms 睡眠,则所有内容都将被允许并且输出将是10

for i := 1; i <= 10; i++ {
    time.Sleep(200 * time.Millisecond)
    wg.Add(1)
    go func() {
        defer wg.Done()
        if limiter.Allow() {
            atomic.AddInt32(&successful, 1)
        }
    }()
}

如果您添加 100 毫秒的睡眠时间,那么平均一半的时间将被允许,并且输出将为5.

您想要的可能是允许 5 次突发 5 个事件/秒:

limiter := rate.NewLimiter(rate.Limit(5), 5)

在没有睡眠的情况下使用它limiter,您还将获得5. 那是因为在没有达到速率限制的情况下允许 5 个事件,而没有睡眠则不允许其余的事件。

于 2021-12-20T07:35:35.743 回答