13

问题:我想使用他们的 .NET SDK 从 AWS S3 并行下载 100 个文件。下载的内容应该存储在 100 个内存流中(文件足够小,我可以从那里获取)。我对 Task、IAsyncResult、Parallel.* 和 .NET 4.0 中的其他不同方法感到困惑。

如果我尝试自己解决问题,我会想到类似这样的伪代码:(编辑以向某些变量添加类型)

using Amazon;
using Amazon.S3;
using Amazon.S3.Model;

AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;


// Prepare to launch requests
var asyncRequests = from rq in requestObjects 
    select _s3.BeginGetObject(rq,null,null);

// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();

// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched 
    select _s3.EndGetRequest(rq);

// Finish requests
var actualResponses = responses.ToList();

// Fetch data
var data = actualResponses.Select(rp => {
    var ms = new MemoryStream(); 
    rp.ResponseStream.CopyTo(ms); 
    return ms;
});

此代码并行启动 100 个请求,这很好。但是,有两个问题:

  1. 最后一条语句将串行下载文件,而不是并行下载。流中似乎没有 BeginCopyTo()/EndCopyTo() 方法...
  2. 在所有请求都响应之前,前面的语句不会放开。换句话说,在所有文件都开始之前,所有文件都不会开始下载。

所以在这里我开始认为我走错了路......

帮助?

4

1 回答 1

22

如果您将操作分解为一个异步处理一个请求然后调用它 100 次的方法,这可能会更容易。

首先,让我们确定您想要的最终结果。由于您将使用的是 aMemoryStream这意味着您将希望Task<MemoryStream>从您的方法中返回 a 。签名将如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)

因为您的AmazonS3对象实现了异步设计模式,所以您可以使用类上的FromAsync方法从实现异步设计模式的TaskFactory中生成一个Task<T>,如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null);

    // But what goes here?

所以你已经在一个好地方,你有一个Task<T>你可以等待或在呼叫完成时得到回调的地方。但是,您需要以某种方式将GetObjectResponse调用返回的 toTask<GetObjectResponse>转换为MemoryStream.

为此,您希望在类上使用该ContinueWith方法Task<T>。将其视为类上Select方法的异步版本,它只是对另一个的投影,除了每次调用时,您都可能创建一个运行部分代码的新任务。EnumerableTask<T>ContinueWith

这样,您的方法如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    // Start the task of downloading.
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null
        );

    // Translate.
    Task<MemoryStream> translation = response.ContinueWith(t => {
        using (Task<GetObjectResponse> resp = t ){
            var ms = new MemoryStream(); 
            t.Result.ResponseStream.CopyTo(ms); 
            return ms;
        } 
    });

    // Return the full task chain.
    return translation;
}

请注意,在上面您可能会调用 pass 的重载ContinueWithTaskContinuationOptions.ExecuteSynchronously因为看起来您正在做最少的工作(我不知道,响应可能是巨大的)。如果您正在做的工作非常少,而为了完成工作而开始一项新任务是有害的,您应该通过TaskContinuationOptions.ExecuteSynchronously,这样您就不会浪费时间为最少的操作创建新任务。

现在您有了可以将一个请求转换为 的方法Task<MemoryStream>,创建一个可以处理任意数量的请求的包装器很简单:

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
    IEnumerable<GetObjectRequest> requests)
{
    // Just call Select on the requests, passing our translation into
    // a Task<MemoryStream>.
    // Also, materialize here, so that the tasks are "hot" when
    // returned.
    return requests.Select(r => GetMemoryStreamAsync(s3, r)).
        ToArray();
}

在上面,您只需获取一系列GetObjectRequest实例,它将返回一个Task<MemoryStream>. 它返回一个物化序列这一事实很重要。如果您在返回之前未将其具体化,则在迭代序列之前不会创建任务。

当然,如果您想要这种行为,那么无论如何,只需删除对 的调用.ToArray(),让方法返回IEnumerable<Task<MemoryStream>>,然后在您遍历任务时发出请求。

从那里,您可以一次处理它们(使用循环中的Task.WaitAny方法)或等待所有它们完成(通过调用Task.WaitAll方法)。后者的一个例子是:

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests)
{
    Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
    Task.WaitAll(tasks);
    return tasks.Select(t => t.Result).ToList();
}

另外,应该提到的是,这非常适合Reactive Extensions 框架,因为这非常适合IObservable<T>实现。

于 2012-05-07T19:19:03.690 回答