我有一个 Web 服务器,它只支持一个非常简单的 API——计算过去一小时、一分钟和一秒收到的请求数。该服务器在世界范围内非常流行,每秒接收数千个请求。
旨在找到如何准确地将这 3 个值返回给每个请求?
请求一直在到来,因此每个请求的一小时、一分钟和一秒的窗口是不同的。如何管理每个请求的不同窗口,以便每个请求的计数正确?
我有一个 Web 服务器,它只支持一个非常简单的 API——计算过去一小时、一分钟和一秒收到的请求数。该服务器在世界范围内非常流行,每秒接收数千个请求。
旨在找到如何准确地将这 3 个值返回给每个请求?
请求一直在到来,因此每个请求的一小时、一分钟和一秒的窗口是不同的。如何管理每个请求的不同窗口,以便每个请求的计数正确?
如果需要 100% 的准确度:
拥有所有请求和 3 个计数的链接列表 - 最后一小时、最后一分钟和最后一秒。
您将有 2 个指向链接列表的指针 - 一分钟前和一秒前。
一小时前将在列表的末尾。每当最后一次请求的时间比当前时间早一个多小时时,将其从列表中删除并减少小时计数。
分钟和秒指针将分别指向一分钟和一秒前发生的第一个请求。每当请求的时间比当前时间早一分钟/秒以上时,将指针上移并减少分钟/秒计数。
当一个新请求进来时,将它添加到所有 3 个计数中,并将其添加到链表的前面。
对计数的请求将只涉及返回计数。
以上所有操作均为摊销常数时间。
如果低于 100% 的准确度是可以接受的:
上面的空间复杂度可能会有点多,这取决于您通常每秒收到多少请求;您可以通过稍微牺牲准确性来减少这种情况,如下所示:
有一个如上所述的链表,但仅限于最后一秒。也有3个计数。
然后有一个由 60 个元素组成的圆形数组,指示过去 60 秒中每一秒的计数。每当经过一秒时,从分钟计数中减去数组的最后一个(最旧的)元素,并将最后一秒计数添加到数组中。
最后 60 分钟有一个类似的圆形阵列。
准确性损失:分钟计数可能会在一秒钟内被所有请求关闭,而小时计数可能在一分钟内被所有请求关闭。
显然,如果您每秒只有一个请求或更少,这将没有任何意义。在这种情况下,您可以将最后一分钟保留在链表中,而最后 60 分钟只有一个循环数组。
对此还有其他变化 - 可以根据需要调整精度与空间使用比率。
删除旧元素的计时器:
如果仅在新元素进入时删除旧元素,它将被摊销恒定时间(某些操作可能需要更长的时间,但它会平均到恒定时间)。
如果您想要真正的恒定时间,您还可以运行一个计时器来删除旧元素,并且每次调用它(当然还有插入和检查计数)只会花费恒定时间,因为您最多删除一些自上次计时器滴答以来以恒定时间插入的元素。
要为 T 秒的时间窗口执行此操作,请使用队列数据结构,在其中将各个请求的时间戳在它们到达时排队。当您想读取在最近的 T 秒窗口内到达的请求数时,首先从队列的“旧”端删除那些早于 T 秒的时间戳,然后读取队列的大小。您还应该在向队列添加新请求时删除元素以保持其大小有界(假设传入请求的速率有界)。
该解决方案可以达到任意精度,例如毫秒精度。如果您对返回近似答案感到满意,您可以例如对于 T = 3600(一小时)的时间窗口,将同一秒内的请求合并到一个队列元素中,使队列大小以 3600 为界。我认为这将超过很好,但理论上会失去准确性。对于 T = 1,您可以根据需要在毫秒级别进行合并。
在伪代码中:
queue Q
proc requestReceived()
Q.insertAtFront(now())
collectGarbage()
proc collectGarbage()
limit = now() - T
while (! Q.empty() && Q.lastElement() < limit)
Q.popLast()
proc count()
collectGarbage()
return Q.size()
为什么不只使用循环数组?我们在该数组中有 3600 个元素。
index = 0;
Array[index % 3600] = count_in_one_second.
++index;
如果你想要最后一秒,返回这个数组的最后一个元素。如果你想要最后一分钟,返回最后 60 个元素的总和。如果你想要最后一小时,返回整个数组的总和(3600 个元素)。
他不是一个简单有效的解决方案吗?
谢谢
德里克
一种解决方案是这样的:
1) 使用长度为 3600 的圆形数组(每小时 60 * 60 秒)保存上一小时每秒的数据。
要记录下一秒的数据,请通过移动循环数组的头指针将最后一秒的数据放入循环数组中。
2)在循环数组的每个元素中,我们不是保存特定秒内的请求数,而是记录我们之前看到的请求数的累积和,一个周期的请求数可以计算为requests_sum.get(current_second) - requests_sum.get(current_second - number_of_seconds_in_this_period)
increament()
, getCountForLastMinute()
,等所有操作getCountForLastHour()
都可以及时完成O(1)
。
==================================================== ========================
这是一个如何工作的示例。
如果我们在最近 3 秒内有这样的请求计数:
1st second: 2 requests
2nd second: 4 requests
3rd second: 3 requests
圆形数组将如下所示:
sum = [2, 6, 9]
其中 6 = 4 + 2 和 9 = 2 + 4 + 3
在这种情况下:
1)如果要获取最后一秒的请求计数(第3秒的请求计数),只需计算sum[2] - sum[1] = 9 - 6 = 3
2)如果要获取最后两秒的请求数(第3秒的请求数和第2秒的请求数),只需计算sum[2] - sum[0] = 9 - 2 = 7
您可以在一小时内每秒创建一个大小为 60x60 的数组,并将其用作循环缓冲区。每个条目包含给定秒的请求数。当你移动到下一秒时,清除它并开始计数。当您处于数组的末尾时,您又从 0 开始,因此有效地清除了 1 小时之前的所有计数。
所以这三个都有 O(1) 的空间和时间复杂度。唯一的缺点是,它忽略了毫秒,但您也可以应用相同的概念来包括毫秒。
这是一个通用的 Java 解决方案,可以跟踪最后一分钟的事件数量。
我使用的原因ConcurrentSkipListSet
是因为它保证了搜索、插入和删除操作的平均时间复杂度为 O(log N)。您可以轻松更改下面的代码以使持续时间(默认为 1 分钟)可配置。
正如上面的答案所建议的,定期清理过时的条目是一个好主意,例如使用调度程序。
@Scope(value = "prototype")
@Component
@AllArgsConstructor
public class TemporalCounter {
@Builder
private static class CumulativeCount implements Comparable<CumulativeCount> {
private final Instant timestamp;
private final int cumulatedValue;
@Override
public int compareTo(CumulativeCount o) {
return timestamp.compareTo(o.timestamp);
}
}
private final CurrentDateTimeProvider currentDateTimeProvider;
private final ConcurrentSkipListSet<CumulativeCount> metrics = new ConcurrentSkipListSet<>();
@PostConstruct
public void init() {
Instant now = currentDateTimeProvider.getNow().toInstant();
metrics.add(new CumulativeCount(now, 0));
}
public void increment() {
Instant now = currentDateTimeProvider.getNow().toInstant();
int previousCount = metrics.isEmpty() ? 0 : metrics.last().cumulatedValue;
metrics.add(new CumulativeCount(now, previousCount + 1));
}
public int getLastCount() {
if (!metrics.isEmpty()) {
cleanup();
CumulativeCount previousCount = metrics.first();
CumulativeCount mostRecentCount = metrics.last();
if (previousCount != null && mostRecentCount != null) {
return mostRecentCount.cumulatedValue - previousCount.cumulatedValue;
}
}
return 0;
}
public void cleanup() {
Instant upperBoundInstant = currentDateTimeProvider.getNow().toInstant().minus(Duration.ofMinutes(1));
CumulativeCount c = metrics.lower(CumulativeCount.builder().timestamp(upperBoundInstant).build());
if (c != null) {
metrics.removeIf(o -> o.timestamp.isBefore(c.timestamp));
if (metrics.isEmpty()) {
init();
}
}
}
public void reset() {
metrics.clear();
init();
}
}
以下代码在 JS 中。它将返回 O(1) 中的计数。我为一次面试编写了这个程序,其中时间被预先定义为 5 分钟。但是您可以修改此代码几秒钟、几分钟等。让我知道事情的后续。
在 clean_hits 方法中,从我们创建的对象中删除每个条目(在我们的时间范围之外),并在删除条目之前从 totalCount 中减去该计数
this.hitStore = { "totalCount" : 0};
我必须在 Go 中解决这个问题,我认为我还没有看到这种方法,但它也可能非常特定于我的用例。
由于它连接到第 3 方 API 并且需要限制自己的请求,我只是在最后一秒保留了一个计数器,在最后 2 分钟保留了一个计数器(我需要的两个计数器)
var callsSinceLastSecond, callsSinceLast2Minutes uint64
然后,当呼叫计数器低于我的允许限制时,我会在单独的 go 例程中启动我的请求
for callsSinceLastSecond > 20 || callsSinceLast2Minutes > 100 {
time.Sleep(10 * time.Millisecond)
}
在每个 goroutine 结束时,我会自动递减计数器。
go func() {
time.Sleep(1 * time.Second)
atomic.AddUint64(&callsSinceLastSecond, ^uint64(0))
}()
go func() {
time.Sleep(2 * time.Minute)
atomic.AddUint64(&callsSinceLast2Minutes, ^uint64(0))
}()
到目前为止,这似乎工作没有任何问题,到目前为止进行了一些相当繁重的测试。
一个简单的时间戳列表怎么样?每次发出请求时,都会将当前时间戳附加到列表中。每次要检查是否低于速率限制时,首先删除超过 1 小时的时间戳以防止堆栈溢出(呵呵),然后计算最后一秒、分钟等时间戳的数量。
它可以在 Python 中轻松完成:
import time
requestsTimestamps = []
def add_request():
requestsTimestamps.append(time.time())
def requestsCount(delayInSeconds):
requestsTimestamps = [t for t in requestsTimestamps if t >= time.time() - 3600]
return len([t for t in requestsTimestamps if t >= time.time() - delayInSeconds])
我想这可以优化,但你看到了这个想法。
我的解决方案:
维护一个 3600 的哈希,其中包含一个计数、时间戳作为字段。
对于每个请求:
案例(1):如果 i/p 时间戳==hash[idx].timestamp,hash[count]++;
情况(2):如果 i/p 时间戳>hash[idx].timestamp,则 hash[idx].count=1 和 hash[idx].timestamp=inputTimeStamp
Case(3): : if i/p timestamp<hash[idx].count // 旧请求,可以忽略。
现在对于最后一秒、分钟、小时的任何查询:如上查找 idx,并且只要时间戳与给定的秒/范围/分钟匹配,就继续以循环方式从 idx 迭代回来。