在这里不应该责怪维基百科,这是Async.Parallel
内部运作方式的结果。的类型签名Async.Parallel
是seq<Async<'T>> -> Async<'T[]>
. 它返回一个包含序列中所有结果的单个 Async 值——因此在返回中的所有计算之前它不会seq<Async<'T>>
返回。
为了说明,我修改了您的代码,以便跟踪未完成请求的数量,即已发送到服务器但尚未接收/解析响应的请求。
open Microsoft.FSharp.Control
open Microsoft.FSharp.Control.WebExtensions
open System
open System.Net
open System.Threading
type WebClientWithTimeout() =
inherit WebClient()
let mutable timeout = -1
member __.Timeout
with get () = timeout
and set value = timeout <- value
override x.GetWebRequest uri =
let r = base.GetWebRequest(uri)
r.Timeout <- x.Timeout
r
type ParsedDoc = ParsedDoc
type ParsedArticle = ParsedArticle
let parseDoc (str : string) = ParsedDoc
let parseArticle (doc : ParsedDoc) = Some ParsedArticle
/// A synchronized wrapper around Console.Out so we don't
/// get garbled console output.
let synchedOut =
System.Console.Out
|> System.IO.TextWriter.Synchronized
let parseWikiAsync(url : string, outstandingRequestCount : int ref) =
async {
use wc = new WebClientWithTimeout(Timeout = 5000)
wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")
// Increment the outstanding request count just before we send the request.
do
// NOTE : The message must be created THEN passed to synchedOut.WriteLine --
// piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
// to be created which somehow defeats the synchronization and garbles the output.
let msg =
Interlocked.Increment outstandingRequestCount
|> sprintf "Outstanding requests: %i"
synchedOut.WriteLine msg
let! html = wc.AsyncDownloadString(Uri(url))
let ret =
try html |> parseDoc |> parseArticle
with ex ->
let msg = sprintf "%A" ex
synchedOut.WriteLine msg
None
// Decrement the outstanding request count now that we've
// received a reponse and parsed it.
do
let msg =
Interlocked.Decrement outstandingRequestCount
|> sprintf "Outstanding requests: %i"
synchedOut.WriteLine msg
return ret
}
/// Writes a message to the console, passing a value through
/// so it can be used within a function pipeline.
let inline passThruWithMessage (msg : string) value =
Console.WriteLine msg
value
let en100 =
let outstandingRequestCount = ref 0
seq { for _ in 1..120 ->
parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", outstandingRequestCount) }
|> Async.Parallel
|> Async.RunSynchronously
|> passThruWithMessage "Finished running all of the requests."
|> Seq.choose id
|> Seq.take 100
如果您编译并运行该代码,您将看到如下输出:
Outstanding requests: 4
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 3
Outstanding requests: 5
Outstanding requests: 6
Outstanding requests: 7
Outstanding requests: 8
Outstanding requests: 9
Outstanding requests: 10
Outstanding requests: 12
Outstanding requests: 14
Outstanding requests: 15
Outstanding requests: 16
Outstanding requests: 17
Outstanding requests: 18
Outstanding requests: 13
Outstanding requests: 19
Outstanding requests: 20
Outstanding requests: 24
Outstanding requests: 22
Outstanding requests: 26
Outstanding requests: 27
Outstanding requests: 28
Outstanding requests: 29
Outstanding requests: 30
Outstanding requests: 25
Outstanding requests: 21
Outstanding requests: 23
Outstanding requests: 11
Outstanding requests: 29
Outstanding requests: 28
Outstanding requests: 27
Outstanding requests: 26
Outstanding requests: 25
Outstanding requests: 24
Outstanding requests: 23
Outstanding requests: 22
Outstanding requests: 21
Outstanding requests: 20
Outstanding requests: 19
Outstanding requests: 18
Outstanding requests: 17
Outstanding requests: 16
Outstanding requests: 15
Outstanding requests: 14
Outstanding requests: 13
Outstanding requests: 12
Outstanding requests: 11
Outstanding requests: 10
Outstanding requests: 9
Outstanding requests: 8
Outstanding requests: 7
Outstanding requests: 6
Outstanding requests: 5
Outstanding requests: 4
Outstanding requests: 3
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 0
Finished running all of the requests.
如您所见,所有请求都是在解析之前发出的——因此,如果您的连接速度较慢,或者您正在尝试检索大量文档,则服务器可能会断开连接,因为它可能会假设您没有检索它试图发送的响应。代码的另一个问题是您需要在 中明确指定要生成的元素数量seq
,这会降低代码的可重用性。
更好的解决方案是在某些消费代码需要时检索和解析页面。(如果你想一想,这正是 F#seq
的优点。)我们将首先创建一个函数,该函数接受一个 Uri 并产生一个seq<Async<'T>>
- 即,它产生一个无限的Async<'T>
值序列,每个值都将检索来自 Uri 的内容,对其进行解析并返回结果。
/// Given a Uri, creates an infinite sequence of whose elements are retrieved
/// from the Uri.
let createDocumentSeq (uri : System.Uri) =
#if DEBUG
let outstandingRequestCount = ref 0
#endif
Seq.initInfinite <| fun _ ->
async {
use wc = new WebClientWithTimeout(Timeout = 5000)
wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")
#if DEBUG
// Increment the outstanding request count just before we send the request.
do
// NOTE : The message must be created THEN passed to synchedOut.WriteLine --
// piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
// to be created which somehow defeats the synchronization and garbles the output.
let msg =
Interlocked.Increment outstandingRequestCount
|> sprintf "Outstanding requests: %i"
synchedOut.WriteLine msg
#endif
let! html = wc.AsyncDownloadString uri
let ret =
try Some html
with ex ->
let msg = sprintf "%A" ex
synchedOut.WriteLine msg
None
#if DEBUG
// Decrement the outstanding request count now that we've
// received a reponse and parsed it.
do
let msg =
Interlocked.Decrement outstandingRequestCount
|> sprintf "Outstanding requests: %i"
synchedOut.WriteLine msg
#endif
return ret
}
现在我们使用此函数将页面作为流检索:
//
let en100_Streaming =
#if DEBUG
let documentCount = ref 0
#endif
Uri ("http://en.wikipedia.org/wiki/Special:Random")
|> createDocumentSeq
|> Seq.choose (fun asyncDoc ->
Async.RunSynchronously asyncDoc
|> Option.bind (parseDoc >> parseArticle))
#if DEBUG
|> Seq.map (fun x ->
let msg =
Interlocked.Increment documentCount
|> sprintf "Parsed documents: %i"
synchedOut.WriteLine msg
x)
#endif
|> Seq.take 50
// None of the computations actually take place until
// this point, because Seq.toArray forces evaluation of the sequence.
|> Seq.toArray
如果您运行该代码,您会看到它一次从服务器中提取一个结果,并且不会留下未完成的请求。此外,更改要检索的结果数量非常容易——您需要做的就是更改传递给Seq.take
.
现在,虽然流式代码工作得很好,但它不会并行执行请求,因此对于大量文档可能会很慢。这是一个很容易解决的问题,尽管解决方案可能有点不直观。与其尝试并行执行整个请求序列——这是原始代码中的问题——让我们创建一个函数,用于并行Async.Parallel
执行小批量请求,然后Seq.collect
将结果组合回一个平面序列.
/// Given a sequence of Async<'T>, creates a new sequence whose elements
/// are computed in batches of a specified size.
let parallelBatch batchSize (sequence : seq<Async<'T>>) =
sequence
|> Seq.windowed batchSize
|> Seq.collect (fun batch ->
batch
|> Async.Parallel
|> Async.RunSynchronously)
要使用这个功能,我们只需要对流媒体版本的代码进行一些小的调整:
let en100_Batched =
let batchSize = 10
#if DEBUG
let documentCount = ref 0
#endif
Uri ("http://en.wikipedia.org/wiki/Special:Random")
|> createDocumentSeq
// Execute batches in parallel
|> parallelBatch batchSize
|> Seq.choose (Option.bind (parseDoc >> parseArticle))
#if DEBUG
|> Seq.map (fun x ->
let msg =
Interlocked.Increment documentCount
|> sprintf "Parsed documents: %i"
synchedOut.WriteLine msg
x)
#endif
|> Seq.take 50
// None of the computations actually take place until
// this point, because Seq.toArray forces evaluation of the sequence.
|> Seq.toArray
同样,更改要检索的文档数量很容易,并且可以轻松修改批量大小(再次,我建议您将其保持在合理的小范围内)。如果您愿意,您可以对“流式处理”和“批处理”代码进行一些调整,以便在运行时在它们之间切换。
最后一件事——使用我的代码,请求不应该超时,因此您可能可以摆脱WebClientWithTimeout
该类并直接使用WebClient
。