2

我已经使用Dns.BeginGetHostEntry方法根据主机名获取主机的 FQDN(主机名列表存储在 SQL Server 数据库中)。此方法(异步)在不到 30 分钟的时间内完成了近 150k 条记录的运行,并更新了存储主机名的同一个 SQL 表中的 FQDN。

此解决方案运行速度过快(超过每秒 300 个请求的阈值)。由于允许没有。对服务器生成的请求是有限的,我的服务器列在最重要的谈话者中,并请求停止此应用程序的运行。我必须重建这个应用程序才能同步运行,现在需要 6 个多小时才能完成。

//// TotalRecords are fetched from SQL database with the Hostname (referred as host further)
for (int i = 0; i < TotalRecords.Rows.Count; i++)
{
    try
    {
        host = TotalRecords.Rows[i].ItemArray[0].ToString();
        Interlocked.Increment(ref requestCounter);
        string[] arr = new string[] { i.ToString(), host }; 
        Dns.BeginGetHostEntry(host, GetHostEntryCallback,arr);
    }
    catch (Exception ex)
    {
        log.Error("Unknown error occurred\n ", ex);
    }
}
do
{
    Thread.Sleep(0);

} while (requestCounter>0);

ListAdapter.Update(TotalRecords);

问题:

  1. 有什么办法可以限制每秒生成的请求数吗?

  2. 我的理解是ParallelOptions.MaxDegreeOfParallelism不控制每秒线程数,那么有什么方法TPL可以成为更好的选择吗?这可以限制为否。每秒请求数?

4

3 回答 3

0

使用 aSemaphoreSlim和 aTimer来限制每个周期的请求。

[DebuggerDisplay( "Current Count = {_semaphore.CurrentCount}" )]
public class TimedSemaphoreSlim : IDisposable
{
    private readonly System.Threading.SemaphoreSlim _semaphore;
    private readonly System.Threading.Timer _timer;
    private int _releaseCount;

    public TimedSemaphoreSlim( int initialcount, TimeSpan period )
    {
        _semaphore = new System.Threading.SemaphoreSlim( initialcount );
        _timer = new System.Threading.Timer( OnTimer, this, period, period );
    }

    public TimedSemaphoreSlim( int initialCount, int maxCount, TimeSpan period )
    {
        _semaphore = new SemaphoreSlim( initialCount, maxCount );
        _timer = new Timer( OnTimer, this, period, period );
    }

    private void OnTimer( object state )
    {
        var releaseCount = Interlocked.Exchange( ref _releaseCount, 0 );
        if ( releaseCount > 0 )
            _semaphore.Release( releaseCount );
    }

    public WaitHandle AvailableWaitHandle => _semaphore.AvailableWaitHandle;
    public int CurrentCount => _semaphore.CurrentCount;

    public void Release()
    {
        Interlocked.Increment( ref _releaseCount );
    }

    public void Release( int releaseCount )
    {
        Interlocked.Add( ref _releaseCount, releaseCount );
    }

    public void Wait()
    {
        _semaphore.Wait();
    }

    public void Wait( CancellationToken cancellationToken )
    {
        _semaphore.Wait( cancellationToken );
    }

    public bool Wait( int millisecondsTimeout )
    {
        return _semaphore.Wait( millisecondsTimeout );
    }

    public bool Wait( int millisecondsTimeout, CancellationToken cancellationToken )
    {
        return _semaphore.Wait( millisecondsTimeout, cancellationToken );
    }

    public bool Wait( TimeSpan timeout, CancellationToken cancellationToken )
    {
        return _semaphore.Wait( timeout, cancellationToken );
    }

    public Task WaitAsync()
    {
        return _semaphore.WaitAsync();
    }

    public Task WaitAsync( CancellationToken cancellationToken )
    {
        return _semaphore.WaitAsync( cancellationToken );
    }

    public Task<bool> WaitAsync( int millisecondsTimeout )
    {
        return _semaphore.WaitAsync( millisecondsTimeout );
    }

    public Task<bool> WaitAsync( TimeSpan timeout )
    {
        return _semaphore.WaitAsync( timeout );
    }

    public Task<bool> WaitAsync( int millisecondsTimeout, CancellationToken cancellationToken )
    {
        return _semaphore.WaitAsync( millisecondsTimeout, cancellationToken );
    }

    public Task<bool> WaitAsync( TimeSpan timeout, CancellationToken cancellationToken )
    {
        return _semaphore.WaitAsync( timeout, cancellationToken );
    }

    #region IDisposable Support
    private bool disposedValue = false; // Dient zur Erkennung redundanter Aufrufe.

    private void CheckDisposed()
    {
        if ( disposedValue )
        {
            throw new ObjectDisposedException( nameof( TimedSemaphoreSlim ) );
        }
    }

    protected virtual void Dispose( bool disposing )
    {
        if ( !disposedValue )
        {
            if ( disposing )
            {
                _timer.Dispose();
                _semaphore.Dispose();
            }

            disposedValue = true;
        }
    }

    public void Dispose()
    {
        Dispose( true );
    }
    #endregion
}

示例使用

IEnumerable<string> bunchOfHosts = GetBunchOfHosts();
IList<IPHostEntry> result;

using ( var limiter = new TimedSemaphoreSlim( 300, 300, TimeSpan.FromSeconds( 1 ) ) )
{
    result = bunchOfHosts.AsParallel()
        .Select( e =>
        {
            limiter.Wait();
            try
            {
                return Dns.GetHostEntry( e );
            }
            finally
            {
                limiter.Release();
            }
        } )
        .ToList();
}
于 2017-05-22T08:50:02.140 回答
0

一个纯粹的异步解决方案。

它使用一个 nuget 包Nite.AsyncExSystem.Reactive它执行错误处理并提供 DNS 的结果,因为它们作为一个IObservable<IPHostEntry>

这里发生了很多事情。您需要将响应式扩展理解为标准异步编程。可能有很多方法可以实现以下结果,但这是一个有趣的解决方案。

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Threading;

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

public static class EnumerableExtensions
{
    public static IEnumerable<Func<U>> Defer<T, U>
        ( this IEnumerable<T> source, Func<T, U> selector) 
        => source.Select(s => (Func<U>)(() => selector(s)));
}


public class Program
{
    /// <summary>
    /// Returns the time to wait before processing another item
    /// if the rate limit is to be maintained
    /// </summary>
    /// <param name="desiredRateLimit"></param>
    /// <param name="currentItemCount"></param>
    /// <param name="elapsedTotalSeconds"></param>
    /// <returns></returns>
    private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds)
    {
        var time = elapsedTotalSeconds;
        var timeout = currentItemCount / desiredRateLimit;
        return timeout - time;
    }

    /// <summary>
    /// Consume the tasks in parallel but with a rate limit. The results
    /// are returned as an observable.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="tasks"></param>
    /// <param name="rateLimit"></param>
    /// <returns></returns>
    public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){
        var s = System.Diagnostics.Stopwatch.StartNew();
        var n = 0;
        var sem = new  AsyncCountdownEvent(1);

        var errors = new ConcurrentBag<Exception>();

        return Observable.Create<T>
            ( observer =>
            {

                var ctx = new CancellationTokenSource();
                Task.Run
                    ( async () =>
                    {
                        foreach (var taskFn in tasks)
                        {
                            n++;
                            ctx.Token.ThrowIfCancellationRequested();

                            var elapsedTotalSeconds = s.Elapsed.TotalSeconds;
                            var delay = Delay( rateLimit, n, elapsedTotalSeconds );
                            if (delay > 0)
                                await Task.Delay( TimeSpan.FromSeconds( delay ), ctx.Token );

                            sem.AddCount( 1 );
                            Task.Run
                                ( async () =>
                                {
                                    try
                                    {
                                        observer.OnNext( await taskFn() );
                                    }
                                    catch (Exception e)
                                    {
                                        errors.Add( e );
                                    }
                                    finally
                                    {
                                        sem.Signal();
                                    }
                                }
                                , ctx.Token );
                        }
                        sem.Signal();
                        await sem.WaitAsync( ctx.Token );
                        if(errors.Count>0)
                            observer.OnError(new AggregateException(errors));
                        else
                            observer.OnCompleted();
                    }
                      , ctx.Token );

                return Disposable.Create( () => ctx.Cancel() );
            } );
    }

    #region hosts



    public static string [] Hosts = new [] { "google.com" }

    #endregion


    public static void Main()
    {
        var s = System.Diagnostics.Stopwatch.StartNew();

        var rate = 25;

        var n = Hosts.Length;

        var expectedTime = n/rate;

        IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer( async host =>
        {
            try
            {
                return await Dns.GetHostEntryAsync( host );
            }
            catch (Exception e)
            {
                throw new Exception($"Can't resolve {host}", e);
            }
        } );

        IObservable<IPHostEntry> results = RateLimit( dnsTaskFactories, rate );

        results
            .Subscribe( result =>
            {
                Console.WriteLine( "result " + DateTime.Now + " " + result.AddressList[0].ToString() );
            },
            onCompleted: () =>
            {
                Console.WriteLine( "Completed" );

                PrintTimes( s, expectedTime );
            },
            onError: e =>
            {
                Console.WriteLine( "Errored" );

                PrintTimes( s, expectedTime );

                if (e is AggregateException ae)
                {
                    Console.WriteLine( e.Message );
                    foreach (var innerE in ae.InnerExceptions)
                    {
                        Console.WriteLine( $"     " + innerE.GetType().Name + " " + innerE.Message );
                    }
                }
                else
                {
                        Console.WriteLine( $"got error " + e.Message );
                }
            }

            );

        Console.WriteLine("Press enter to exit");
        Console.ReadLine();
    }

    private static void PrintTimes(Stopwatch s, int expectedTime)
    {
        Console.WriteLine( "Done" );
        Console.WriteLine( "Elapsed Seconds " + s.Elapsed.TotalSeconds );
        Console.WriteLine( "Expected Elapsed Seconds " + expectedTime );
    }
}

最后几行输出是

result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 157.7.105.52
result 5/23/2017 3:23:36 PM 223.223.182.225
result 5/23/2017 3:23:36 PM 64.34.93.5
result 5/23/2017 3:23:36 PM 212.83.211.103
result 5/23/2017 3:23:36 PM 205.185.216.10
result 5/23/2017 3:23:36 PM 198.232.125.32
result 5/23/2017 3:23:36 PM 66.231.176.100
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:37 PM 219.84.203.116
Errored
Done
Elapsed Seconds 19.9990118
Expected Elapsed Seconds 19
One or more errors occurred.
     Exception Can't resolve adv758968.ru
     Exception Can't resolve fr.a3dfp.net
     Exception Can't resolve ads.adwitserver.com
     Exception Can't resolve www.adtrader.com
     Exception Can't resolve trak-analytics.blic.rs
     Exception Can't resolve ads.buzzcity.net

我无法粘贴完整的代码,所以这里是带有主机列表的代码的链接。

https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190

于 2017-05-22T09:30:48.657 回答
0

你有没有考虑过使用TPL Dataflow图书馆?它有一种非常方便的方式来限制同类型的并发操作。它还有机会通过限制缓冲区大小来限制整个管道。

基本上,您需要创建的只是一个管道:

所以你的代码可能是这样的:

// buffer limited to 30 items in queue
// all other items would be postponed and added to queue automatically
// order in queue is preserved
var hosts = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = 30 });

// get a host and perform a dns search operation
var handler = new TransformBlock<string, IPHostEntry>(host => Dns.GetHostEntry(host),
  // no more than 5 simultaneous requests at a time
  new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

// gather results in an array of size 500 
var batchBlock = new BatchBlock<IPHostEntry>(500);

// get the resulting array and save it to database
var batchSave = new ActionBlock<IPHostEntry[]>(r => GetHostEntryCallback(r));

// link all the blocks to automatically propagate items along the pipeline
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
hosts.LinkTo(handler, linkOptions);
handler.LinkTo(batchBlock, linkOptions);
batchBlock.LinkTo(batchSave, linkOptions);

// provide the data to pipeline
for (var i = 0; i < TotalRecords.Rows.Count; ++i)
{
    var host = TotalRecords.Rows[i].ItemArray[0].ToString();
    // async wait for item to be sent to pipeline
    // will throttle starting with 31th item in a buffer queue
    await hosts.SendAsync(host);
}

// pipeline is complete now, just wait it finishes
hosts.Complete();

// wait for the last block to finish it's execution
await batchSave.Completion;

// notify user that update is over

我鼓励您阅读How-toMSDN 上的整个部分,以更好地了解您可以使用此库做什么,也许可以继续阅读官方文档

顺便说一句,您可以使用SqlBulkCopy该类来更新数据库,如果它符合您的要求,通常它比使用SqlDataAdapter.

于 2017-05-22T16:55:51.600 回答