2

并行检索 60 行的最快方法是什么,其中每行驻留在 Azure 表存储的不同分区中?

我尝试了以下选项,其中异步调用(第 1 到 4 次)都花了大约 2 到 3 秒的相似时间来获得 60 行。

  1. 通过异步 Query.Execute 使用多个任务
  2. 使用具有 Interlocked.Increment 和异步 Query.Execute 的 ManualResetEvent
  3. 使用带有异步 Query.Execute 的 ManualResetEvents 列表
  4. 带有 ManualResetEvents 列表并使用异步 Query.Execute 的 Parallel.ForEach 循环
  5. 带有阻塞 Query.Execute 的 Parallel.ForEach

仅供参考,我使用的是 Azure SDK 1.7 和 .Net 4。

我为每个选项使用的示例代码:

  1. 通过异步 Query.Execute 使用多个任务

    public List<string> Get100RowsUsingTaskWaitAllWithAsyncExecute()
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var tasks = new Queue<Task>();
    
        foreach (var eachId in _rowKeysToGet.Take(60))
        {
            var cloudStorageAccount =
                CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
            var ctx = new LogServiceContext(
                cloudStorageAccount.TableEndpoint.AbsoluteUri,
                cloudStorageAccount.Credentials);
    
            var query = (from e in ctx.LogEntries where e.RowKey == eachId
                             && e.PartitionKey == _partitionKey
                         select e).AsTableServiceQuery<LogEntry>();
    
            tasks.Enqueue(
                Task<ResultSegment<LogEntry>>.Factory.FromAsync(
                    query.BeginExecuteSegmented,
                    query.EndExecuteSegmented,
                    TaskCreationOptions.None)
                .ContinueWith(t => {
                    var r = t.Result.Results.FirstOrDefault();
                    result.Add(r);
                }));
        }
    
        Task.WaitAll(tasks.ToArray());
    
        var output = result.Select(r => r.RowKey).ToList();
    
        return output;
    }
    
  2. 使用具有 Interlocked.Increment 和异步 Query.Execute 的 ManualResetEvent

    public List<string> Get100RowsUsingOneManualResetEventWithAsyncExecute()
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
        var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials);
    
        var threadCount = 0;
        using (var finished = new ManualResetEvent(false))
        {
            foreach (var eachId in _rowKeysToGet.Take(60))
            {
                Interlocked.Increment(ref threadCount);
    
                var query = (from e in ctx.LogEntries
                             where e.RowKey == eachId
                                   && e.PartitionKey == _partitionKey
                             select e).AsTableServiceQuery<LogEntry>();
    
                var startTimeInLoop = DateTime.UtcNow;
    
                query.BeginExecuteSegmented((ar) =>
                {
                    var response = (ar.AsyncState as CloudTableQuery<LogEntry>)
                        .EndExecuteSegmented(ar);
    
                    var eachLogEntry = response.Results.FirstOrDefault();
    
                    result.Add(eachLogEntry);
    
                    if (Interlocked.Decrement(ref threadCount) == 0)
                    {
                        finished.Set();
                    }
    
                }, query);
            }
    
            finished.WaitOne();
        }
    
        return result.Select(r => r.RowKey).ToList();
    }
    
  3. 使用带有异步 Query.Execute 的 ManualResetEvents 列表

    public List<string> Get100RowsUsingListOfManualResetEventsWithAsyncExecute()    
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
        var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials);
    
        var events = new List<ManualResetEvent>();
    
        foreach (var eachId in _rowKeysToGet.Take(60))
        {
            var query = (from e in ctx.LogEntries
                         where e.RowKey == eachId
                             && e.PartitionKey == _partitionKey
                         select e).AsTableServiceQuery<LogEntry>();
    
            var evt = new ManualResetEvent(false);
    
            LogEntry eachLogEntry = null;
            query.BeginExecuteSegmented((ar) => {
                var response = (ar.AsyncState as CloudTableQuery<LogEntry>)
                    .EndExecuteSegmented(ar);
    
                eachLogEntry = response.Results.FirstOrDefault();
    
                result.Add(eachLogEntry);
    
                evt.Set();
    
            }, query);
    
            events.Add(evt);
        };
    
        WaitHandle.WaitAll(events.ToArray());
    
        events.ForEach(e => e.Dispose());
    
        return result.Select(r => r.RowKey).ToList();
    }
    
  4. 带有 ManualResetEvents 列表并使用异步 Query.Execute 的 Parallel.ForEach 循环

    public List<string> Get100RowsUsingParallelForEachAndManualResetEventWithAsyncExecute()
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
        var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials);
    
        var events = new List<ManualResetEvent>();
    
        Parallel.ForEach(_rowKeysToGet.Take(60), eachId =>
        {
            var query = (from e in ctx.LogEntries
                          where e.RowKey == eachId
                              && e.PartitionKey == _partitionKey
                          select e).AsTableServiceQuery<LogEntry>();
    
            var evt = new ManualResetEvent(false);
    
            LogEntry eachLogEntry = null;
            query.BeginExecuteSegmented((ar) =>
            {
                var response = (ar.AsyncState as CloudTableQuery<LogEntry>)
                    .EndExecuteSegmented(ar);
    
                eachLogEntry = response.Results.FirstOrDefault();
    
                result.Add(eachLogEntry);
    
                evt.Set();
    
            }, query);
    
            events.Add(evt);
        });
    
        WaitHandle.WaitAll(events.ToArray());
    
        events.ForEach(e => e.Dispose());
    
        return result.Select(r => r.RowKey).ToList();
    }
    
  5. 带有阻塞 Query.Execute 的 Parallel.ForEach

    public List<string> Get100RowsUsingParallelForEachWithBlockingExecute()
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
        var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials);
    
        Parallel.ForEach(_rowKeysToGet.Take(60), eachId =>
        {
            var query = (from e in ctx.LogEntries
                         where e.RowKey == eachId
                             && e.PartitionKey == _partitionKey
                         select e).AsTableServiceQuery<LogEntry>();
    
            var eachLogEntry = query.Execute().FirstOrDefault();
    
            result.Add(eachLogEntry);
        });
    
        return result.Select(r => r.RowKey).ToList();
    }
    
4

0 回答 0