32

我有一个 Web 服务器,它只支持一个非常简单的 API——计算过去一小时、一分钟和一秒收到的请求数。该服务器在世界范围内非常流行,每秒接收数千个请求。

旨在找到如何准确地将这 3 个值返回给每个请求?

请求一直在到来,因此每个请求的一小时、一分钟和一秒的窗口是不同的。如何管理每个请求的不同窗口,以便每个请求的计数正确?

4

10 回答 10

34

如果需要 100% 的准确度:

拥有所有请求和 3 个计数的链接列表 - 最后一小时、最后一分钟和最后一秒。

您将有 2 个指向链接列表的指针 - 一分钟前和一秒前。

一小时前将在列表的末尾。每当最后一次请求的时间比当前时间早一个多小时时,将其从列表中删除并减少小时计数。

分钟和秒指针将分别指向一分钟和一秒前发生的第一个请求。每当请求的时间比当前时间早一分钟/秒以上时,将指针上移并减少分钟/秒计数。

当一个新请求进来时,将它添加到所有 3 个计数中,并将其添加到链表的前面。

对计数的请求将只涉及返回计数。

以上所有操作均为摊销常数时间。

如果低于 100% 的准确度是可以接受的:

上面的空间复杂度可能会有点多,这取决于您通常每秒收到多少请求;您可以通过稍微牺牲准确性来减少这种情况,如下所示:

有一个如上所述的链表,但仅限于最后一秒。也有3个计数。

然后有一个由 60 个元素组成的圆形数组,指示过去 60 秒中每一秒的计数。每当经过一秒时,从分钟计数中减去数组的最后一个(最旧的)元素,并将最后一秒计数添加到数组中。

最后 60 分钟有一个类似的圆形阵列。

准确性损失:分钟计数可能会在一秒钟内被所有请求关闭,而小时计数可能在一分钟内被所有请求关闭。

显然,如果您每秒只有一个请求或更少,这将没有任何意义。在这种情况下,您可以将最后一分钟保留在链表中,而最后 60 分钟只有一个循环数组。

对此还有其他变化 - 可以根据需要调整精度与空间使用比率。

删除旧元素的计时器:

如果仅在新元素进入时删除旧元素,它将被摊销恒定时间(某些操作可能需要更长的时间,但它会平均到恒定时间)。

如果您想要真正的恒定时间,您还可以运行一个计时器来删除旧元素,并且每次调用它(当然还有插入和检查计数)只会花费恒定时间,因为您最多删除一些自上次计时器滴答以来以恒定时间插入的元素。

于 2013-07-10T08:45:22.063 回答
14

要为 T 秒的时间窗口执行此操作,请使用队列数据结构,在其中将各个请求的时间戳在它们到达时排队。当您想读取在最近的 T 秒窗口内到达的请求数时,首先从队列的“旧”端删除那些早于 T 秒的时间戳,然后读取队列的大小。您还应该在向队列添加新请求时删除元素以保持其大小有界(假设传入请求的速率有界)。

该解决方案可以达到任意精度,例如毫秒精度。如果您对返回近似答案感到满意,您可以例如对于 T = 3600(一小时)的时间窗口,将同一秒内的请求合并到一个队列元素中,使队列大小以 3600 为界。我认为这将超过很好,但理论上会失去准确性。对于 T = 1,您可以根据需要在毫秒级别进行合并。

在伪代码中:

queue Q

proc requestReceived()
  Q.insertAtFront(now())
  collectGarbage()

proc collectGarbage()
  limit = now() - T
  while (! Q.empty() && Q.lastElement() < limit)
    Q.popLast()

proc count()
  collectGarbage()
  return Q.size()
于 2013-07-10T05:14:03.567 回答
9

为什么不只使用循环数组?我们在该数组中有 3600 个元素。

index = 0;
Array[index % 3600] = count_in_one_second. 
++index;

如果你想要最后一秒,返回这个数组的最后一个元素。如果你想要最后一分钟,返回最后 60 个元素的总和。如果你想要最后一小时,返回整个数组的总和(3600 个元素)。

他不是一个简单有效的解决方案吗?

谢谢

德里克

于 2015-01-22T23:44:12.043 回答
6

一种解决方案是这样的:

1) 使用长度为 3600 的圆形数组(每小时 60 * 60 秒)保存上一小时每秒的数据。

要记录下一秒的数据,请通过移动循环数组的头指针将最后一秒的数据放入循环数组中。

2)在循环数组的每个元素中,我们不是保存特定秒内的请求数,而是记录我们之前看到的请求数的累积和,一个周期的请求数可以计算为requests_sum.get(current_second) - requests_sum.get(current_second - number_of_seconds_in_this_period)

increament(), getCountForLastMinute(),等所有操作getCountForLastHour()都可以及时完成O(1)

==================================================== ========================

这是一个如何工作的示例。

如果我们在最近 3 秒内有这样的请求计数: 1st second: 2 requests 2nd second: 4 requests 3rd second: 3 requests

圆形数组将如下所示: sum = [2, 6, 9] 其中 6 = 4 + 2 和 9 = 2 + 4 + 3

在这种情况下:

1)如果要获取最后一秒的请求计数(第3秒的请求计数),只需计算sum[2] - sum[1] = 9 - 6 = 3

2)如果要获取最后两秒​​的请求数(第3秒的请求数和第2秒的请求数),只需计算sum[2] - sum[0] = 9 - 2 = 7

于 2017-12-06T03:14:00.387 回答
2

您可以在一小时内每秒创建一个大小为 60x60 的数组,并将其用作循环缓冲区。每个条目包含给定秒的请求数。当你移动到下一秒时,清除它并开始计数。当您处于数组的末尾时,您又从 0 开始,因此有效地清除了 1 小时之前的所有计数。

  1. 小时:返回所有元素的总和
  2. For Minute:返回最后 60 个条目的总和(来自 currentIndex)
  3. 对于第二个:返回 currentIndex 的计数

所以这三个都有 O(1) 的空间和时间复杂度。唯一的缺点是,它忽略了毫秒,但您也可以应用相同的概念来包括毫秒。

于 2014-02-10T01:37:33.263 回答
1

这是一个通用的 Java 解决方案,可以跟踪最后一分钟的事件数量。

我使用的原因ConcurrentSkipListSet是因为它保证了搜索、插入和删除操作的平均时间复杂度为 O(log N)。您可以轻松更改下面的代码以使持续时间(默认为 1 分钟)可配置。

正如上面的答案所建议的,定期清理过时的条目是一个好主意,例如使用调度程序。

@Scope(value = "prototype")
@Component
@AllArgsConstructor
public class TemporalCounter {

    @Builder
    private static class CumulativeCount implements Comparable<CumulativeCount> {

        private final Instant timestamp;
        private final int cumulatedValue;

        @Override
        public int compareTo(CumulativeCount o) {
            return timestamp.compareTo(o.timestamp);
        }
    }

    private final CurrentDateTimeProvider currentDateTimeProvider;
    private final ConcurrentSkipListSet<CumulativeCount> metrics = new ConcurrentSkipListSet<>();

    @PostConstruct
    public void init() {
        Instant now = currentDateTimeProvider.getNow().toInstant();
        metrics.add(new CumulativeCount(now, 0));
    }

    public void increment() {
        Instant now = currentDateTimeProvider.getNow().toInstant();
        int previousCount = metrics.isEmpty() ? 0 : metrics.last().cumulatedValue;
        metrics.add(new CumulativeCount(now, previousCount + 1));
    }

    public int getLastCount() {
        if (!metrics.isEmpty()) {
            cleanup();

            CumulativeCount previousCount = metrics.first();
            CumulativeCount mostRecentCount = metrics.last();
            if (previousCount != null && mostRecentCount != null) {
                return mostRecentCount.cumulatedValue - previousCount.cumulatedValue;
            }
        }
        return 0;
    }

    public void cleanup() {
        Instant upperBoundInstant = currentDateTimeProvider.getNow().toInstant().minus(Duration.ofMinutes(1));
        CumulativeCount c = metrics.lower(CumulativeCount.builder().timestamp(upperBoundInstant).build());
        if (c != null) {
            metrics.removeIf(o -> o.timestamp.isBefore(c.timestamp));
            if (metrics.isEmpty()) {
                init();
            }
        }
    }

    public void reset() {
        metrics.clear();
        init();
    }
}
于 2020-06-28T15:35:18.683 回答
1

以下代码在 JS 中。它将返回 O(1) 中的计数。我为一次面试编写了这个程序,其中时间被预先定义为 5 分钟。但是您可以修改此代码几秒钟、几分钟等。让我知道事情的后续。

  1. 创建一个以毫秒为键,以计数器为值的对象
  2. 添加一个名为 totalCount 的属性并将其预定义为 0
  3. 在步骤 1 中定义的每个命中增量计数器日志和总计数
  4. 添加一个名为 clean_hits 的方法,每毫秒调用一次该方法
  5. 在 clean_hits 方法中,从我们创建的对象中删除每个条目(在我们的时间范围之外),并在删除条目之前从 totalCount 中减去该计数

    this.hitStore = { "totalCount" : 0};

于 2016-08-21T07:12:35.487 回答
1

我必须在 Go 中解决这个问题,我认为我还没有看到这种方法,但它也可能非常特定于我的用例。

由于它连接到第 3 方 API 并且需要限制自己的请求,我只是在最后一秒保留了一个计数器,在最后 2 分钟保留了一个计数器(我需要的两个计数器)

var callsSinceLastSecond, callsSinceLast2Minutes uint64

然后,当呼叫计数器低于我的允许限制时,我会在单独的 go 例程中启动我的请求

for callsSinceLastSecond > 20 || callsSinceLast2Minutes > 100 {
    time.Sleep(10 * time.Millisecond)
}

在每个 goroutine 结束时,我会自动递减计数器。

go func() {
    time.Sleep(1 * time.Second)
    atomic.AddUint64(&callsSinceLastSecond, ^uint64(0))
}()

go func() {
    time.Sleep(2 * time.Minute)
    atomic.AddUint64(&callsSinceLast2Minutes, ^uint64(0))
}()

到目前为止,这似乎工作没有任何问题,到目前为止进行了一些相当繁重的测试。

于 2018-04-15T16:27:27.460 回答
0

一个简单的时间戳列表怎么样?每次发出请求时,都会将当前时间戳附加到列表中。每次要检查是否低于速率限制时,首先删除超过 1 小时的时间戳以防止堆栈溢出(呵呵),然后计算最后一秒、分钟等时间戳的数量。

它可以在 Python 中轻松完成:

import time

requestsTimestamps = []

def add_request():
    requestsTimestamps.append(time.time())

def requestsCount(delayInSeconds):
    requestsTimestamps = [t for t in requestsTimestamps if t >= time.time() - 3600]
    return len([t for t in requestsTimestamps if t >= time.time() - delayInSeconds])

我想这可以优化,但你看到了这个想法。

于 2018-01-14T04:30:31.920 回答
0

我的解决方案:

  1. 维护一个 3600 的哈希,其中包含一个计数、时间戳作为字段。

  2. 对于每个请求:

    • 通过时间戳%3600 获取 idx(当前元素的数组索引)。
    • 如果 hash[idx].count=0,那么 hash[idx].count=1 和 hash[idx].timestamp=inputTimeStamp
    • 如果 hash[idx].count>0 ,那么

    案例(1):如果 i/p 时间戳==hash[idx].timestamp,hash[count]++;

    情况(2):如果 i/p 时间戳>hash[idx].timestamp,则 hash[idx].count=1 和 hash[idx].timestamp=inputTimeStamp

    Case(3): : if i/p timestamp<hash[idx].count // 旧请求,可以忽略。

现在对于最后一秒、分钟、小时的任何查询:如上查找 idx,并且只要时间戳与给定的秒/范围/分钟匹配,就继续以循环方式从 idx 迭代回来。

于 2020-10-17T13:33:48.837 回答