2

我经常需要发出大量的webrequests,而不会使网络过载

我目前通过并行运行同步请求来做到这一点,利用 ThreadPool.SetMinThreads 和 MaxDegreeOfParallelism 来准确指定同时运行的请求数量

现在这工作得很好,但感觉不对。

我真的很想使用异步方法,但我不知道如何限制并发请求的数量。

我这样做的并行方式的简化示例(使用 web 客户端并且为简洁起见没有错误处理):

Private Function SearchSitesForKeywordInParallel(ByVal keyword As String, ByVal sites As String(), ByVal maxConcurrency As Integer) As String()
    Dim po As New ParallelOptions
    po.MaxDegreeOfParallelism = maxConcurrency
    Threading.ThreadPool.SetMinThreads(maxConcurrency, 2)
    Dim sitesContainingKeyword As New Concurrent.ConcurrentBag(Of String)

    Parallel.For(0, sites.Count, po, Sub(i)
                                         Dim wc As New Net.WebClient
                                         wc.Proxy = Nothing
                                         Dim pageSource As String = wc.DownloadString(sites(i))
                                         If pageSource.Contains(keyword) Then
                                             sitesContainingKeyword.Add(sites(i))
                                         End If
                                     End Sub)
    Return sitesContainingKeyword.ToArray
End Function

这是一个阻塞功能,这是我所需要的。现在我已经在常规的 for 循环中测试了 webclient.downloadStringAsync 方法,它会立即触发所有请求,使网络过载。

我想做的是最初提出 X 个请求,然后在每个响应返回时提出新的请求。

我相当肯定任务是要走的路,而且我肯定已经阅读了一些非常好的 c# 实现,但是我的 c# 经验有限,而且我很难将 c# lamadas 翻译成 vb.net。

我也仅限于 vs2010 和 .net4,所以 .net4.5 异步等待的细节对我来说不是一个选择。

非常感谢任何帮助

4

3 回答 3

1

不确定,如果我完全理解,您到底想要实现什么,但如果您想使用 aync 方法,您可以这样做:

    Dim google As String = "http://www.google.com/#&q="

    Dim qsites As New Concurrent.ConcurrentQueue(Of String)
    For Each k In {"foo", "bar", "john", "jack", "stackoverflow", "basic", "ship", "car", "42"}
        qsites.Enqueue(google & k)
    Next

    Dim cde As New System.Threading.CountdownEvent(qsites.Count)

    Dim strings As New Concurrent.ConcurrentBag(Of String)
    Dim completedhandler = Sub(wco As Object, ev As Net.DownloadStringCompletedEventArgs)
                               Dim wc = DirectCast(wco, Net.WebClient)
                               Debug.Print("got one!")
                               strings.Add(ev.Result)
                               cde.Signal()
                               Dim s As String = String.Empty
                               If qsites.TryDequeue(s) Then
                                   Debug.Print("downloading from {0}", s)
                                   wc.DownloadStringAsync(New Uri(s))
                               End If
                           End Sub

    Dim numthreads As Integer = 4

    System.Threading.Tasks.Task.Factory.StartNew(Sub()
                                                     For i = 1 To numthreads
                                                         Dim s As String = String.Empty
                                                         If qsites.TryDequeue(s) Then
                                                             Dim wc As New Net.WebClient
                                                             wc.Proxy = Nothing
                                                             AddHandler wc.DownloadStringCompleted, completedhandler
                                                             Debug.Print("downloading from {0}", s)
                                                             wc.DownloadStringAsync(New Uri(s))
                                                         End If
                                                     Next
                                                 End Sub)

    cde.Wait()

您只需要在不同的线程/任务中“开始”异步下载,因为(afaik)WC 的 downloadcompleted 事件在 UI 线程(或 currentsync..context)中触发,然后 cde.wait 将不允许处理事件.

于 2012-12-18T15:30:37.557 回答
1

我只想为此添加另一个答案,因为我最近解决了一个类似的问题(注意代码片段是在 C# 中,但应该给出这个想法)。

我曾经在不同的线程上将并行 http 同步请求的数量发送到 http 服务器,并用于限制我使用semaphore发送的请求数量。

现在,我已经适应了新的TPL(c# 5.0 - aysn/await - 非常方便(基本上在 TPL 中引入的延续对我来说听起来很自然 - 并且使用 async/await 它变得更容易使用)),调用网络 I/O异步

即理想情况下,现在我将在调用者中仅使用一个线程(除非我真的需要在继续之前获得结果),并让 .net、os 和 I/o 完成端口线程协同工作以调用线程池中的延续代码以完成操作(基本上是 APM 中的“回调”,基于事件的模式中的已完成事件,TPL 中的“继续”,C# 5.0 (4.5 .net) 中等待之后的代码)

当我接受异步 i/o 时,我遵循的原则很简单——除非真的有必要,否则不要让线程等待并浪费 CPU 和资源!

于 2014-06-26T21:37:51.543 回答
0

您可以使用 Wintellect Powerthreading 库的AsyncEnumerator类在 VB.NET 中异步执行此操作,该类可以从 NuGet 获得。

这为您提供了 Await 的一些功能,但可以在 VS2010 中使用 .Net 2.0 到 4.0,同时为您提供 4.5 异步功能的升级路径。

缺点是 WebClient 异步方法需要基于 Task<> 的 EAP-to-APM shim 才能与 AsyncEnumerator 一起使用,因此代码要复杂得多。

控制并发请求数量的最简单方法是启动 X 个异步操作,然后在每次完成时启动另一个。

示例代码:

Imports System.Collections.Generic
Imports System.Runtime.CompilerServices
Imports System.Threading.Tasks
Imports System.Net
Imports Wintellect.Threading.AsyncProgModel

Module TaskExtension
    REM http://msdn.microsoft.com/en-us/library/hh873178.aspx
    <Extension()>
    Public Function AsApm(Of T1)(ByVal task As Task(Of T1), callback As AsyncCallback, state As Object) As IAsyncResult
        If (task Is Nothing) Then
            Throw New ArgumentNullException("task")
        End If
        Dim tcs = New TaskCompletionSource(Of T1)(state)
        task.ContinueWith(Sub(t As Task(Of T1))
                              If (t.IsFaulted) Then
                                  tcs.TrySetException(t.Exception.InnerExceptions)
                              ElseIf t.IsCanceled Then
                                  tcs.TrySetCanceled()
                              Else : tcs.TrySetResult(t.Result)
                              End If
                              If (Not callback Is Nothing) Then
                                  callback(tcs.Task)
                              End If
                          End Sub, TaskScheduler.Default)
        Return tcs.Task
    End Function
End Module

Module ApmAsyncDownload
    Public Function DownloadStringAsync(url As Uri) As Task(Of String)
        Dim tcs As New TaskCompletionSource(Of String)
        Dim wc As New WebClient()
        AddHandler wc.DownloadStringCompleted, Sub(s As Object, e As System.Net.DownloadStringCompletedEventArgs)
                                                   If (Not (e.Error Is Nothing)) Then
                                                       tcs.TrySetException(e.Error)
                                                   ElseIf e.Cancelled Then
                                                       tcs.TrySetCanceled()
                                                   Else : tcs.TrySetResult(e.Result)
                                                   End If
                                               End Sub
        wc.DownloadStringAsync(url)
        Return tcs.Task
    End Function
    Public Function BeginDownloadString(url As Uri, callback As AsyncCallback, state As Object) As IAsyncResult
        Return DownloadStringAsync(url).AsApm(callback, state)
    End Function
    Public Function EndDownloadString(asyncResult As IAsyncResult) As String
        Dim castToTask As Task(Of String) = asyncResult
        Return castToTask.Result
    End Function
End Module

Public Class AsyncIterators
    Private Shared Iterator Function SearchUrl(ae As AsyncEnumerator(Of Boolean), keyword As String, uri As Uri) As IEnumerator(Of Int32)
        ae.Result = False
        ApmAsyncDownload.BeginDownloadString(uri, ae.End(0, AddressOf ApmAsyncDownload.EndDownloadString), Nothing)
        Yield 1
        If (ae.IsCanceled()) Then
            Return
        End If
        Try
            Dim page As String = ApmAsyncDownload.EndDownloadString(ae.DequeueAsyncResult)
            ae.Result = page.Contains(keyword)
        Catch ex As AggregateException
        End Try
    End Function
    Public Shared Iterator Function SearchIterator(ae As AsyncEnumerator(Of List(Of String)), keyword As String, urls As List(Of Uri)) As IEnumerator(Of Int32)
        ae.Result = New List(Of String)
        'Control how many searches are started asynchonously
        Dim startSearches = Math.Min(3, urls.Count)
        Dim enumerator = urls.GetEnumerator
        Dim toBeCompleted = urls.Count
        Do Until (toBeCompleted <= 0)
            While (startSearches > 0)
                If enumerator.MoveNext Then
                    Dim subAe = New AsyncEnumerator(Of Boolean)()
                    subAe.SyncContext = Nothing
                    subAe.BeginExecute(SearchUrl(subAe, keyword, enumerator.Current), ae.End(0, Function(ar As IAsyncResult) As AsyncEnumerator.EndObjectXxx
                                                                                                    subAe.EndExecute(ar)
                                                                                                End Function), enumerator.Current)
                End If
                startSearches = startSearches - 1
            End While
            'Wait for first async search to complete
            Yield 1
            toBeCompleted = toBeCompleted - 1
            If (ae.IsCanceled()) Then
                Exit Do
            End If
            'Get result of the search and add to results
            Dim result = ae.DequeueAsyncResult()
            Dim completedAe = AsyncEnumerator(Of Boolean).FromAsyncResult(result)
            If (completedAe.EndExecute(result)) Then
                Dim uri As Uri = result.AsyncState
                ae.Result.Add(uri.OriginalString)
            End If
            'Start 1 more search
            startSearches = startSearches + 1
        Loop
    End Function
End Class

Module Module1
    Sub Main()
        Dim searchAe = New AsyncEnumerator(Of List(Of String))()
        searchAe.SyncContext = Nothing
        Dim urlStrings = New List(Of String) From {"http://www.google.com", "http://www.yahoo.com", "http://www.dogpile.com"}
        Dim uris = urlStrings.Select(Function(urlString As String) As Uri
                                         Return New Uri(urlString)
                                     End Function).ToList()
        For Each Str As String In searchAe.EndExecute(searchAe.BeginExecute(AsyncIterators.SearchIterator(searchAe, "search", uris), Nothing, Nothing))
            Console.WriteLine(Str)
        Next
        Console.ReadKey()
    End Sub
End Module

我现在明白你翻译 c# lambda 的意思了!

于 2012-12-17T19:10:28.267 回答