3

使用 TPL / Parallel.ForEach 有一种开箱即用的方法来限制每单位时间调用方法的次数(即每秒不超过 50 次调用)。这与限制线程数不同。也许有一些简单的技巧可以使这项工作?

4

3 回答 3

2

一种解决方案是制作以下https://stackoverflow.com/a/7728872/356790的线程安全版本

/// <summary>
/// This class limits the number of requests (method calls, events fired, etc.) that can occur in a given unit of time.
/// </summary>
class RequestLimiter
{

    #region Constructors

    /// <summary>
    /// Initializes an instance of the RequestLimiter class.
    /// </summary>
    /// <param name="maxRequests">The maximum number of requests that can be made in a given unit of time.</param>
    /// <param name="timeSpan">The unit of time that the maximum number of requests is limited to.</param>
    /// <exception cref="ArgumentException">maxRequests &lt;= 0</exception>
    /// <exception cref="ArgumentException">timeSpan.TotalMilliseconds &lt;= 0</exception>
    public RequestLimiter( int maxRequests , TimeSpan timeSpan )
    {
        // check parameters
        if ( maxRequests <= 0 )
        {
            throw new ArgumentException( "maxRequests <= 0" , "maxRequests" );
        }
        if ( timeSpan.TotalMilliseconds <= 0 )
        {
            throw new ArgumentException( "timeSpan.TotalMilliseconds <= 0" , "timeSpan" );
        }

        // initialize instance vars
        _maxRequests = maxRequests;
        _timeSpan = timeSpan;
        _requestTimes = new Queue<DateTime>( maxRequests );

        // sleep for 1/10th timeSpan
        _sleepTimeInMs = Convert.ToInt32( Math.Ceiling( timeSpan.TotalMilliseconds / 10 ) );
    }

    #endregion

    /// <summary>
    /// Waits until an request can be made
    /// </summary>
    public void WaitUntilRequestCanBeMade()
    {
        while ( !TryEnqueueRequest() )
        {
            Thread.Sleep( _sleepTimeInMs );
        }
    }

    #region Private Members

    private readonly Queue<DateTime> _requestTimes;
    private readonly object _requestTimesLock = new object();
    private readonly int _maxRequests;
    private readonly TimeSpan _timeSpan;
    private readonly int _sleepTimeInMs;

    /// <summary>
    /// Remove requests that are older than _timeSpan
    /// </summary>
    private void SynchronizeQueue()
    {
        while ( ( _requestTimes.Count > 0 ) && ( _requestTimes.Peek().Add( _timeSpan ) < DateTime.Now ) )
        {
            _requestTimes.Dequeue();
        }
    }

    /// <summary>
    /// Attempts to enqueue a request.
    /// </summary>
    /// <returns>
    /// Returns true if the request was successfully enqueued.  False if not.
    /// </returns>
    private bool TryEnqueueRequest()
    {
        lock ( _requestTimesLock )
        {
            SynchronizeQueue();
            if ( _requestTimes.Count < _maxRequests )
            {
                _requestTimes.Enqueue( DateTime.Now );
                return true;
            }
            return false;
        }
    }

    #endregion

}
于 2013-05-06T14:00:31.773 回答
0

使用 Timer 的现成代码示例:

使用 Reactive Extensions (Rx) 的代码示例/示例:

于 2013-05-06T15:51:24.263 回答
0

此解决方案在每个线程的启动之间强制执行延迟,并可用于满足您的要求。

    private SemaphoreSlim CooldownLock = new SemaphoreSlim(1, 1);
    private DateTime lastAction;

    private void WaitForCooldown(TimeSpan delay)
    {
        CooldownLock.Wait();

        var waitTime = delay - (DateTime.Now - lastAction);

        if (waitTime > TimeSpan.Zero)
        {
            Task.Delay(waitTime).Wait();
            lastAction = DateTime.Now;
        }

        lastAction = DateTime.Now;

        CooldownLock.Release();
    }

    public void Execute(Action[] actions, int concurrentThreadLimit, TimeSpan threadDelay)
    {
        if (actions.Any())
        {
            Parallel.ForEach(actions, 
                             new ParallelOptions() { MaxDegreeOfParallelism = concurrentThreadLimit}, 
                            (currentAction) =>
                            {
                                WaitForCooldown(threadDelay);
                                currentAction();
                            });
        }
    }
于 2017-07-26T16:15:29.840 回答