10

我需要限制时间段deltaT期间允许的事件数n。我能想到的任何方法,空间都是O(m),其中m是每个deltaTO(deltaT/r)发送的最大事件请求数,其中r是可接受的分辨率。

编辑:deltaT 是相对于时间戳的滑动时间窗口。

例如:保持事件时间戳的循环缓冲区。在事件裁剪所有比t-deltaT更早的时间戳。如果时间戳数超过n则拒绝事件。将时间戳添加到缓冲区。

或者,初始化一个大小为deltaT/r的整数的循环桶缓冲区,该缓冲区按相对于当前分辨率r的时间进行索引。维护指针i发生事件时, i自上次事件以来的时间除以r递增。将原始i和新 i 之间的缓冲区归零。在i处增加。拒绝,如果错误的总和超过n

有什么更好的方法?


我刚刚在 c# 中实现了我上面的第二个建议,固定deltaT为 1 s,固定分辨率为 10 ms。

public class EventCap
{
    private const int RES = 10; //resolution in ms

    private int _max;
    private readonly int[] _tsBuffer;
    private int p = 0;
    private DateTime? _lastEventTime;
    private int _length = 1000 / RES;

    public EventCap(int max)
    {
        _max = max;

        _tsBuffer = new int[_length];
    }

    public EventCap()
    {
    }

    public bool Request(DateTime timeStamp)
    {
        if (_max <= 0)
            return true;

        if (!_lastEventTime.HasValue)
        {
            _lastEventTime = timeStamp;
            _tsBuffer[0] = 1;
            return true;
        }

        //A
        //Mutually redundant with B
        if (timeStamp - _lastEventTime >= TimeSpan.FromSeconds(1))
        {
            _lastEventTime = timeStamp;
            Array.Clear(_tsBuffer, 0, _length);
            _tsBuffer[0] = 1;
            p = 0;
            return true;
        }

        var newP = (timeStamp - _lastEventTime.Value).Milliseconds / RES + p;

        if (newP < _length)
            Array.Clear(_tsBuffer, p + 1, newP - p);

        else if (newP > p + _length)
        {
            //B
            //Mutually redundant with A
            Array.Clear(_tsBuffer, 0, _length);
        }
        else
        {
            Array.Clear(_tsBuffer, p + 1, _length - p - 1);
            Array.Clear(_tsBuffer, 0, newP % _length);
        }

        p = newP % _length;
        _tsBuffer[p]++;
        _lastEventTime = timeStamp;

        var sum = _tsBuffer.Sum();

        return sum <= 10;
    }
}
4

4 回答 4

14

拥有这些变量怎么样:num_events_allowed、time_before、time_now、time_passed

在初始化时,您将执行以下操作:time_before = system.timer(), num_events_allowed = n

收到事件后,请执行以下操作:

  time_now = system.timer()
  time_passed = time_now - time_before
  time_before = time_now

  num_events_allowed += time_passed * (n / deltaT);

  if num_events_allowed > n 
      num_events_allowed = n

  if num_events_allowed >= 1
      let event through, num_events_allowed -= 1
  else
      ignore event

这个算法的好处是 num_events_allowed 实际上是增加了自上次事件以来经过的时间以及可以接收事件的速率,这样您就可以按顺序增加每个 time_passed 可以发送的事件数保持在 n 的范围内。因此,如果您过早地收到一个事件,您会将其增加小于 1,如果它在太多时间之后您将增加超过 1。当然,如果事件通过,您将在刚刚收到事件时将津贴减 1。如果余量超过了 n 的最大事件,则将其返回到 n ,因为在任何时间阶段都不能允许超过 n 。如果配额小于1,则不能发送整个事件,不要让它通过!

这是漏桶算法:https ://en.wikipedia.org/wiki/Leaky_bucket

于 2012-09-21T14:02:24.973 回答
3

为每个传入请求保留滑动窗口并使其保持 O(1) + 非常小的 O(n) 的一种方法是制作一个合适大小的整数数组并将其保持为循环缓冲区并将传入请求离散化(请求为与 A/D 转换器中的采样电平集成,或者如果您是统计学家,则作为直方图集成)并跟踪循环缓冲区的总和,如下所示

assumptions: 
"there can be no more than 1000 request per minute" and 
"we discretize on every second"

int[] buffer = new zeroed int-array with 60 zeroes
int request-integrator = 0 (transactional)
int discretizer-integrator = 0 (transactional)

for each request:
    check if request-integrator < 1000 then
         // the following incs could be placed outside 
         // the if statement for saturation on to many
         // requests (punishment)
         request-integrator++                     // important
         discretizer-integrator++
         proceed with request

once every second:                    // in a transactional memory transaction, for God's saké 
    buffer-index++
    if (buffer-index = 60) then buffer-index=0    // for that circular buffer feeling!
    request-integrator -= buffer[buffer-index]    // clean for things happening one minute ago
    buffer[buffer-index] = discretizer-integrator // save this times value
    discretizer-integrator = 0                    // resetting for next sampling period

请注意,请求积分器的增加“可以”每秒只进行一次,但这会留下一个漏洞,以便在一秒钟内用 1000 个或更糟的请求饱和它,大约每分钟一次,具体取决于惩罚行为。

于 2012-09-26T22:37:39.227 回答
2

在阅读有关该问题的各种可能解决方案时。我遇到了令牌桶算法(http://en.wikipedia.org/wiki/Token_bucket)。如果我完全理解你的问题,你可以实现一个令牌桶算法,而实际上没有一个带有 N 个令牌的桶,而是采用一个可以相应地递增和递减的计数器。像

syncronized def get_token = 
    if count>0 
       { --count, return true }
    else return false

syncronized def add_token = 
    if count==N
       return;
    else ++count

现在剩下的部分是在 deltaT/r 时间内重复调用 add_token。

为了使其完全线程安全,我们需要一个原子引用来计数。但是上面的代码是为了展示在 O(1) 内存中执行它的基本思想。

于 2012-09-21T14:19:56.463 回答
1

我写了下面的类(ActionQueue)来限制函数调用的频率。一件好事是它使用计时器将事物从队列中弹出......所以CPU的使用最少(如果队列为空,甚至根本不使用)......与任何轮询类型的技术相反.

例子...

    // limit to two call every five seconds
    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(5), 2);
    public void Test()
    {
        for (var i = 0; i < 10; i++)
        {
            _actionQueue.Enqueue((i2) =>
            {
                Console.WriteLineAction " + i2 + ": " + DateTime.UtcNow);
            }, i);
        }
    }

现实世界的例子...

    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(1), 10);

    public override void SendOrderCancelRequest(Order order, SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var parms = (Tuple<Order, SessionID>)state;
            base.SendOrderCancelRequest(parms.Item1, parms.Item2);
        }, new Tuple<Order, SessionID>(order, sessionID));
    }
    public override void SendOrderMassStatusRequest(SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var sessionID2 = (SessionID)state;
            base.SendOrderMassStatusRequest(sessionID2);
        }, sessionID);
    }

实际上课...

public class ActionQueue
{
    private class ActionState
    {
        public Action<object> Action;
        public object State;
        public ActionState(Action<object> action, object state)
        {
            Action = action;
            State = state;
        }
    }
    Queue<ActionState> _actions = new Queue<ActionState>();
    Queue<DateTime> _times = new Queue<DateTime>();

    TimeSpan _timeSpan;
    int _maxActions;
    public ActionQueue(TimeSpan timeSpan, int maxActions)
    {
        _timeSpan = timeSpan;
        _maxActions = maxActions;           
    }
    public void Enqueue(Action<object> action, object state)
    {
        lock (_times)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);

            if (_times.Count <= _maxActions)
                action(state);
            else
                _actions.Enqueue(new ActionState(action, state));

            CreateDequeueTimerIfNeeded();
        }
    }

    System.Threading.Timer _dequeueTimer;
    protected void CreateDequeueTimerIfNeeded()
    {
        // if we have no timer and we do have times, create a timer
        if (_dequeueTimer == null && _times.Count > 0) 
        {
            var timeSpan = _times.Peek() - DateTime.UtcNow;
            if (timeSpan.TotalSeconds <= 0)
            {
                HandleTimesQueueChange();
            }
            else
            {
                _dequeueTimer = new System.Threading.Timer((obj) =>
                {
                    lock (_times)
                    {
                        _dequeueTimer = null;
                        HandleTimesQueueChange();
                    }
                }, null, timeSpan, System.Threading.Timeout.InfiniteTimeSpan);
            }
        }
    }

    private void HandleTimesQueueChange()
    {
        _times.Dequeue();
        while (_times.Count > 0 && _times.Peek() < DateTime.UtcNow)
            _times.Dequeue();

        while (_actions.Count > 0 && _times.Count < _maxActions)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);
            var actionState = _actions.Dequeue();
            actionState.Action(actionState.State);
        }

        CreateDequeueTimerIfNeeded();
    }
}
于 2016-12-28T22:50:03.553 回答