并行检索 60 行的最快方法是什么,其中每行驻留在 Azure 表存储的不同分区中?
我尝试了以下选项,其中异步调用(第 1 到 4 次)都花了大约 2 到 3 秒的相似时间来获得 60 行。
- 通过异步 Query.Execute 使用多个任务
- 使用具有 Interlocked.Increment 和异步 Query.Execute 的 ManualResetEvent
- 使用带有异步 Query.Execute 的 ManualResetEvents 列表
- 带有 ManualResetEvents 列表并使用异步 Query.Execute 的 Parallel.ForEach 循环
- 带有阻塞 Query.Execute 的 Parallel.ForEach
仅供参考,我使用的是 Azure SDK 1.7 和 .Net 4。
我为每个选项使用的示例代码:
通过异步 Query.Execute 使用多个任务
public List<string> Get100RowsUsingTaskWaitAllWithAsyncExecute() { var result = new ConcurrentBag<LogEntry>(); var tasks = new Queue<Task>(); foreach (var eachId in _rowKeysToGet.Take(60)) { var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext( cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); tasks.Enqueue( Task<ResultSegment<LogEntry>>.Factory.FromAsync( query.BeginExecuteSegmented, query.EndExecuteSegmented, TaskCreationOptions.None) .ContinueWith(t => { var r = t.Result.Results.FirstOrDefault(); result.Add(r); })); } Task.WaitAll(tasks.ToArray()); var output = result.Select(r => r.RowKey).ToList(); return output; }
使用具有 Interlocked.Increment 和异步 Query.Execute 的 ManualResetEvent
public List<string> Get100RowsUsingOneManualResetEventWithAsyncExecute() { var result = new ConcurrentBag<LogEntry>(); var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); var threadCount = 0; using (var finished = new ManualResetEvent(false)) { foreach (var eachId in _rowKeysToGet.Take(60)) { Interlocked.Increment(ref threadCount); var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); var startTimeInLoop = DateTime.UtcNow; query.BeginExecuteSegmented((ar) => { var response = (ar.AsyncState as CloudTableQuery<LogEntry>) .EndExecuteSegmented(ar); var eachLogEntry = response.Results.FirstOrDefault(); result.Add(eachLogEntry); if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } }, query); } finished.WaitOne(); } return result.Select(r => r.RowKey).ToList(); }
使用带有异步 Query.Execute 的 ManualResetEvents 列表
public List<string> Get100RowsUsingListOfManualResetEventsWithAsyncExecute() { var result = new ConcurrentBag<LogEntry>(); var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); var events = new List<ManualResetEvent>(); foreach (var eachId in _rowKeysToGet.Take(60)) { var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); var evt = new ManualResetEvent(false); LogEntry eachLogEntry = null; query.BeginExecuteSegmented((ar) => { var response = (ar.AsyncState as CloudTableQuery<LogEntry>) .EndExecuteSegmented(ar); eachLogEntry = response.Results.FirstOrDefault(); result.Add(eachLogEntry); evt.Set(); }, query); events.Add(evt); }; WaitHandle.WaitAll(events.ToArray()); events.ForEach(e => e.Dispose()); return result.Select(r => r.RowKey).ToList(); }
带有 ManualResetEvents 列表并使用异步 Query.Execute 的 Parallel.ForEach 循环
public List<string> Get100RowsUsingParallelForEachAndManualResetEventWithAsyncExecute() { var result = new ConcurrentBag<LogEntry>(); var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); var events = new List<ManualResetEvent>(); Parallel.ForEach(_rowKeysToGet.Take(60), eachId => { var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); var evt = new ManualResetEvent(false); LogEntry eachLogEntry = null; query.BeginExecuteSegmented((ar) => { var response = (ar.AsyncState as CloudTableQuery<LogEntry>) .EndExecuteSegmented(ar); eachLogEntry = response.Results.FirstOrDefault(); result.Add(eachLogEntry); evt.Set(); }, query); events.Add(evt); }); WaitHandle.WaitAll(events.ToArray()); events.ForEach(e => e.Dispose()); return result.Select(r => r.RowKey).ToList(); }
带有阻塞 Query.Execute 的 Parallel.ForEach
public List<string> Get100RowsUsingParallelForEachWithBlockingExecute() { var result = new ConcurrentBag<LogEntry>(); var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); Parallel.ForEach(_rowKeysToGet.Take(60), eachId => { var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); var eachLogEntry = query.Execute().FirstOrDefault(); result.Add(eachLogEntry); }); return result.Select(r => r.RowKey).ToList(); }