如果您将操作分解为一个异步处理一个请求然后调用它 100 次的方法,这可能会更容易。
首先,让我们确定您想要的最终结果。由于您将使用的是 aMemoryStream
这意味着您将希望Task<MemoryStream>
从您的方法中返回 a 。签名将如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
因为您的AmazonS3
对象实现了异步设计模式,所以您可以使用类上的FromAsync
方法从实现异步设计模式的TaskFactory
类中生成一个Task<T>
,如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null);
// But what goes here?
所以你已经在一个好地方,你有一个Task<T>
你可以等待或在呼叫完成时得到回调的地方。但是,您需要以某种方式将GetObjectResponse
调用返回的 toTask<GetObjectResponse>
转换为MemoryStream
.
为此,您希望在类上使用该ContinueWith
方法Task<T>
。将其视为类上Select
方法的异步版本,它只是对另一个的投影,除了每次调用时,您都可能创建一个运行该部分代码的新任务。Enumerable
Task<T>
ContinueWith
这样,您的方法如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
// Start the task of downloading.
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null
);
// Translate.
Task<MemoryStream> translation = response.ContinueWith(t => {
using (Task<GetObjectResponse> resp = t ){
var ms = new MemoryStream();
t.Result.ResponseStream.CopyTo(ms);
return ms;
}
});
// Return the full task chain.
return translation;
}
请注意,在上面您可能会调用 pass 的重载,ContinueWith
TaskContinuationOptions.ExecuteSynchronously
因为看起来您正在做最少的工作(我不知道,响应可能是巨大的)。如果您正在做的工作非常少,而为了完成工作而开始一项新任务是有害的,您应该通过TaskContinuationOptions.ExecuteSynchronously
,这样您就不会浪费时间为最少的操作创建新任务。
现在您有了可以将一个请求转换为 的方法Task<MemoryStream>
,创建一个可以处理任意数量的请求的包装器很简单:
static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
// Just call Select on the requests, passing our translation into
// a Task<MemoryStream>.
// Also, materialize here, so that the tasks are "hot" when
// returned.
return requests.Select(r => GetMemoryStreamAsync(s3, r)).
ToArray();
}
在上面,您只需获取一系列GetObjectRequest
实例,它将返回一个Task<MemoryStream>
. 它返回一个物化序列这一事实很重要。如果您在返回之前未将其具体化,则在迭代序列之前不会创建任务。
当然,如果您想要这种行为,那么无论如何,只需删除对 的调用.ToArray()
,让方法返回IEnumerable<Task<MemoryStream>>
,然后在您遍历任务时发出请求。
从那里,您可以一次处理它们(使用循环中的Task.WaitAny
方法)或等待所有它们完成(通过调用Task.WaitAll
方法)。后者的一个例子是:
static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
Task.WaitAll(tasks);
return tasks.Select(t => t.Result).ToList();
}
另外,应该提到的是,这非常适合Reactive Extensions 框架,因为这非常适合IObservable<T>
实现。