0

我在 Azure 上实现了一个工作角色,它需要从 Azure 存储中删除 blob。假设我的 blob 列表有大约 10K 项。

最简单的同步方法可能是:

Parallel.ForEach(list, x => ((CloudBlob) x).Delete());

要求:

  • 我想异步实现相同的东西(在单个线程上)。

  • 我想将并发连接数限制为 50 - 所以当同时只执行 50 个异步连接时,我将执行 10K 删除。如果一个删除完成,可以开始一个新的。

解决方案?

到目前为止,在阅读了这个问题这个问题之后,似乎TPL Dataflow是要走的路。

这是一个如此简单的问题,数据流似乎有点过头了。有没有更简单的选择?

如果没有,这将如何使用数据流来实现?据我了解,我需要一个执行async删除的操作块(我需要await吗?)。创建我的块时,我应该设置MaxDegreeOfParallelism为 50。然后我需要将我的 10K blob 从列表中发布到块,然后使用block.Completion.Wait(). 它是否正确?

4

2 回答 2

4

对于这么简单的事情, aSemaphoreSlim就足够了。TPL 数据流很棒,特别是如果您希望将工作限制在较大管道的一部分中。但是,在您的场景中,这听起来更像是您确实只需要限制一项操作。

异步执行非常简单:

var semaphore = new SemaphoreSlim(50);
var tasks = list.Cast<CloudBlob>().Select(async x =>
{
    using (await semaphore.TakeAsync())
        await x.DeleteAsync();
});
await Task.WhenAll(tasks);

其中TakeAsync定义为:

private sealed class SemaphoreSlimKey : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    public SemaphoreSlimKey(SemaphoreSlim semaphore) { _semaphore = semaphore; }
    void IDisposable.Dispose() { _semaphore.Release(); }
}

public static async Task<IDisposable> TakeAsync(this SemaphoreSlim semaphore)
{
    await semaphore.WaitAsync().ConfigureAwait(false);
    return new SemaphoreSlimKey(semaphore);
}
于 2013-09-08T20:24:48.480 回答
0

您可以考虑使用限制并行度的任务计划程序:http: //msdn.microsoft.com/en-us/library/ee789351.aspx。无限并行可能会导致限制,因为您可能会立即对服务器进行太多并发请求的 DOS,因此被认为是在应用程序层限制这一点的最佳实践。

请注意,2.1 存储客户端支持具有抢先取消等功能的任务,以使其更容易。您上面编写的代码将需要 50 个线程,因为它正在调用同步方法。您可以使用新的 DeleteAsync 方法在单个线程上完全异步执行此操作。如果您要利用 Async await,维护 50 个并发请求将非常简单,因为您可以在循环中执行 await any,只需添加一个额外的工作项等。

本次演讲涵盖了一些最佳实践,可能对您有所帮助: http: //channel9.msdn.com/Events/TechEd/NorthAmerica/2013/WAD-B406

于 2013-09-16T22:46:26.663 回答