172

我可以使用一些伪代码,或者更好的 Python。我正在尝试为 Python IRC 机器人实现一个限速队列,它可以部分工作,但是如果有人触发的消息少于限制(例如,速率限制是每 8 秒 5 条消息,而这个人只触发 4 条),并且下一个触发时间超过 8 秒(例如,16 秒后),机器人发送消息,但队列已满,机器人等待 8 秒,尽管由于 8 秒的时间已经过去,因此不需要它。

4

10 回答 10

259

这是最简单的算法,如果您只想在消息到达太快时丢弃消息(而不是排队,这是有道理的,因为队列可能会变得任意大):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

在这个解决方案中没有数据结构、计时器等,它工作得很干净:) 看到这一点,“津贴”最多以每秒 5/8 个单位的速度增长,即每八秒最多五个单位。转发的每条消息都会扣除一个单位,因此每八秒发送的消息不能超过五条。

注意rate应该是一个整数,即没有非零小数部分,否则算法将无法正常工作(实际速率不会是rate/per)。例如rate=0.5; per=1.0;不起作用,因为allowance永远不会增长到 1.0。但rate=1.0; per=2.0;工作正常。

于 2009-03-20T23:15:27.253 回答
52

在入队的函数之前使用这个装饰器 @RateLimited(ratepersec)。

基本上,这会检查自上次以来是否已经过 1/rate secs,如果没有,则等待剩余时间,否则不等待。这有效地限制了您的速率/秒。装饰器可以应用于您想要限制速率的任何功能。

在您的情况下,如果您希望每 8 秒最多发送 5 条消息,请在您的 sendToQueue 函数之前使用 @RateLimited(0.625) 。

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)
于 2009-03-20T19:51:44.137 回答
30

令牌桶实现起来相当简单。

从一个有 5 个令牌的存储桶开始。

每 5/8 秒:如果桶中的令牌少于 5 个,则添加一个。

每次要发送消息:如果bucket中的token≥1,则取出一个token发送消息。否则,等待/丢弃消息/无论如何。

(显然,在实际代码中,您将使用整数计数器而不是真正的令牌,并且您可以通过存储时间戳来优化每 5/8 秒的步骤)


再次阅读问题,如果速率限制每 8 秒完全重置一次,那么这里是一个修改:

last_send从很久以前(例如,在纪元)的时间戳开始。此外,从相同的 5 令牌桶开始。

执行每 5/8 秒规则。

每次发送消息时:首先检查是否last_send≥8 秒前。如果是这样,请填充存储桶(将其设置为 5 个令牌)。其次,如果桶中有令牌,则发送消息(否则,丢弃/等待/等)。第三,设置last_send到现在。

这应该适用于那种情况。


我实际上已经使用这样的策略(第一种方法)编写了一个 IRC 机器人。它在 Perl 中,而不是 Python 中,但这里有一些代码来说明:

这里的第一部分处理向存储桶添加令牌。您可以看到基于时间添加令牌的优化(第 2 到最后一行)然后最后一行将存储桶内容钳制到最大值(MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$conn 是一个被传递的数据结构。这是在一个例行运行的方法中(它计算下一次它什么时候有事情要做,并且休眠那么长时间或者直到它获得网络流量)。该方法的下一部分处理发送。它相当复杂,因为消息具有与之相关的优先级。

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

这是第一个队列,无论如何都会运行。即使它使我们的连接因洪水而被杀死。用于极其重要的事情,例如响应服务器的 PING。接下来,剩下的队列:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

最后,桶状态被保存回 $conn 数据结构(实际上在该方法中稍晚一点;它首先计算它多久会有更多工作)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

如您所见,实际的桶处理代码非常小——大约四行。其余代码是优先队列处理。该机器人具有优先级队列,例如,与它聊天的人无法阻止它执行重要的踢/禁止职责。

于 2009-03-20T19:04:30.803 回答
12

要阻塞处理直到可以发送消息,从而使更多消息排队,antti的漂亮解决方案也可以这样修改:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

它只是等到有足够的余量来发送消息。为了不以两倍的费率开始,津贴也可以用0初始化。

于 2011-06-20T17:39:35.310 回答
3

一种解决方案是为每个队列项目附加一个时间戳,并在 8 秒后丢弃该项目。您可以在每次添加队列时执行此检查。

仅当您将队列大小限制为 5 并在队列已满时丢弃任何添加时,这才有效。

于 2009-03-20T19:07:15.557 回答
2

保留最后五行发送的时间。保留排队的消息,直到第五个最近的消息(如果存在)过去至少 8 秒(last_five 作为时间数组):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()
于 2009-03-20T19:18:10.367 回答
2

如果有人仍然感兴趣,我将这个简单的可调用类与定时 LRU 键值存储结合使用来限制每个 IP 的请求率。使用双端队列,但可以重写为与列表一起使用。

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")
于 2013-06-10T19:23:38.193 回答
1

只是来自已接受答案的代码的 python 实现。

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler
于 2016-10-20T09:52:31.630 回答
0

这个怎么样:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}
于 2009-06-01T22:58:49.223 回答
0

我需要 Scala 的变体。这里是:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

以下是它的使用方法:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}
于 2016-10-14T03:39:42.813 回答