0

我的输入是位于 Amazon S3 服务器上的一长串文件。我想下载文件的元数据,计算本地文件的哈希值,并将元数据哈希值与本地文件的哈希值进行比较。

目前,我使用一个循环来异步启动所有元数据下载,然后在每次完成后,根据需要在本地文件上计算 MD5 并进行比较。这是代码(只是相关行):

Dim s3client As New AmazonS3Client(KeyId.Text, keySecret.Text)
Dim responseTasks As New List(Of System.Tuple(Of ListViewItem, Task(Of GetObjectMetadataResponse)))
For Each lvi As ListViewItem In lvStatus.Items
    Dim gomr As New Amazon.S3.Model.GetObjectMetadataRequest
    gomr.BucketName = S3FileDialog.GetBucketName(lvi.SubItems(2).Text)
    gomr.Key = S3FileDialog.GetPrefix(lvi.SubItems(2).Text)
    responseTasks.Add(New System.Tuple(Of ListViewItem, Task(Of GetObjectMetadataResponse))(lvi, s3client.GetObjectMetadataAsync(gomr)))
Next
For Each t As System.Tuple(Of ListViewItem, Task(Of GetObjectMetadataResponse)) In responseTasks
    Dim response As GetObjectMetadataResponse = Await t.Item2
    If response.ETag.Trim(""""c) = MD5CalcFile(lvi.SubItems(1).Text) Then
        lvi.SubItems(3).Text = "Match"
        UpdateLvi(lvi)
    End If
Next

我有两个问题:

  1. 我正在按照我制作它们的顺序等待回复。我宁愿按照它们完成的顺序处理它们,以便我更快地得到它们。

  2. MD5 计算时间长且同步。我尝试使其异步但进程锁定。我认为 MD5 任务已添加到 .Net 任务列表的末尾,并且直到所有下载完成后才开始运行。

理想情况下,我在响应到达时对其进行处理,而不是按顺序处理,MD5 是异步的,但有机会运行。

编辑:

结合WhenAll,它现在看起来像这样:

Dim s3client As New Amazon.S3.AmazonS3Client(KeyId.Text, keySecret.Text)
Dim responseTasks As New Dictionary(Of Task(Of GetObjectMetadataResponse), ListViewItem)
    For Each lvi As ListViewItem In lvStatus.Items
        Dim gomr As New Amazon.S3.Model.GetObjectMetadataRequest
        gomr.BucketName = S3FileDialog.GetBucketName(lvi.SubItems(2).Text)
        gomr.Key = S3FileDialog.GetPrefix(lvi.SubItems(2).Text)
        responseTasks.Add(s3client.GetObjectMetadataAsync(gomr), lvi)
    Next
    Dim startTime As DateTimeOffset = DateTimeOffset.Now
    Do While responseTasks.Count > 0
        Dim currentTask As Task(Of GetObjectMetadataResponse) = Await Task.WhenAny(responseTasks.Keys)
        Dim response As GetObjectMetadataResponse = Await currentTask
        If response.ETag.Trim(""""c) = MD5CalcFile(lvi.SubItems(1).Text) Then
            lvi.SubItems(3).Text = "Match"
            UpdateLvi(lvi)
        End If
    Loop
    MsgBox((DateTimeOffset.Now - startTime).ToString)

只要 MDSCalcFile 完成,UI 就会立即锁定。整个循环大约需要 45 秒,第一个文件的 MD5 结果发生在开始后的 1 秒内。

如果我将行更改为:

        If response.ETag.Trim(""""c) = Await Task.Run(Function () MD5CalcFile(lvi.SubItems(1).Text)) Then

完成 MD5CalcFile 后,UI 不会锁定。整个循环大约需要 75 秒,从 45 秒开始,第一个文件的 MD5 结果发生在等待 40 秒之后。

编辑2:

我找到了一个适合我的解决方案。问题出在我的 GetObjectMetadataAsync 中。我写错了。评论中错误的正确版本如下:

<System.Runtime.CompilerServices.Extension>
Function GetObjectMetadataAsync(a As AmazonS3Client, l As GetObjectMetadataRequest) As Task(Of GetObjectMetadataResponse)
    Return Task.Factory.FromAsync(AddressOf a.BeginGetObjectMetadata, AddressOf a.EndGetObjectMetadata, l, Nothing)
    'Return Task.Run(Function()
    '                    Try
    '                        Return a.GetObjectMetadata(l)
    '                    Catch ex As Amazon.S3.AmazonS3Exception
    '                        If ex.ErrorCode = "NoSuchKey" Then
    '                            Return Nothing
    '                        Else
    '                            Throw ex
    '                        End If
    '                    End Try
    '                End Function)
End Function

我不知道为什么将同步版本放入线程或使用 FromAsync 很重要,但显然后者更好看,测试表明它要快得多。

4

2 回答 2

7

您可以WhenAny在任务结果完成时使用它们来处理它们:

while (responseTasks.Length > 0)
{
  var completedTask = await Task.WhenAny(responseTasks);
  responseTasks.Remove(completedTask);
  var response = await completedTask;
  ...
}

(对 C# 感到抱歉;我的 VB 语法太长了,无法正确)。

有关该主题的完整讨论,请参阅Stephen Toub 关于该主题的帖子

另一个选项是TPL Dataflow,它允许您构建一个“网格”供数据通过。对于此示例,Dataflow 可能有点矫枉过正,但如果您的实际处理更复杂,它会很有用。

就 MD5 而言,使其异步应该不是问题。基于异步 I/O 的任务(例如由 返回的任务GetObjectMetadataAsync)不消耗线程池线程。我会尝试其他一些场景(比如自己异步运行 MD5),然后如果没有出现明显的问题,则发布另一个问题。

于 2012-09-14T19:09:09.680 回答
0

也许 usingTask.ContinueWith会有所帮助,而不是 using await。它还可以帮助使代码比await在这些循环内部使用更简单。

笔记:

显然,我没有与您相同的可用数据类型,因此我使用基本数据类型和调试语句处理了我的示例,然后尝试将您的代码示例改进到其中。您可能需要稍微调整语法。

编辑:

我更新了我的示例以包括传递同步上下文以ContinueWith确保回调发生在 UI 线程上(假设此调用代码发生在 UI 线程上)。

参考:UI线程上的任务继续

...

Dim s3client As New AmazonS3Client(KeyId.Text, keySecret.Text)

For Each lvi As ListViewItem In lvStatus.Items

    Dim currentListItem = lvi

    Dim gomr As New Amazon.S3.Model.GetObjectMetadataRequest
    gomr.BucketName = S3FileDialog.GetBucketName(lvi.SubItems(2).Text)
    gomr.Key = S3FileDialog.GetPrefix(lvi.SubItems(2).Text)

    ' Pass this context to the ContinueWith method so that the callback is executed on the UI thread '
    Dim context = TaskScheduler.FromCurrentSynchronizationContext()
    Dim t = s3client.GetObjectMetadataAsync(gomr)
    t.ContinueWith(Sub(task) OnDownloadComplete(task.Result, currentListItem), context)
    t.Start()

Next

...

Private Sub OnDownloadComplete(response As GetObjectMetadataResponse, item As ListViewItem)
    If response.ETag.Trim(""""c) = MD5CalcFile(item.SubItems(1).Text) Then
        item.SubItems(3).Text = "Match"
        UpdateLvi(item)
    End If
End Sub
于 2012-09-14T21:26:34.540 回答