5

我有一些相当简单的 F# 异步代码可以从 Wikipedia 下载一百篇随机文章(用于研究)。

出于某种原因,代码在下载过程中会在任意时间点挂起。有时是50后,有时是80后。

异步代码本身相当简单:

let parseWikiAsync(url:string, count:int ref) =
    async {
            use wc = new WebClientWithTimeout(Timeout = 5000)
            let! html = wc.AsyncDownloadString(Uri(url))
            let ret =
                try html |> parseDoc |> parseArticle
                with | ex -> printfn "%A" ex; None
            lock count (fun () ->
                if !count % 10 = 0 then
                    printfn "%d" !count
                count := !count + 1
            )
            return ret
    }

因为我无法通过 fsi 找出问题所在,所以我制作了 WebClientWithTimeout,这是一个System.Net.WebClient允许我指定超时的包装器:

type WebClientWithTimeout() =
    inherit WebClient()
    member val Timeout = 60000 with get, set

    override x.GetWebRequest uri =
        let r = base.GetWebRequest(uri)
        r.Timeout <- x.Timeout
        r

然后我使用异步组合器检索超过一百页,并清除所有返回 parseWikiAsync 调用的文章None(其中大部分是“消歧页”),直到我有正好 100 篇文章:

let en100 =
    let count = ref 0
    seq { for _ in 1..110 -> parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", count) }
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Seq.choose id
    |> Seq.take 100

当我编译代码并在调试器中运行它时,只有三个线程,其中只有一个正在运行实际代码——异步管道。另外两个“不可用”的位置,调用堆栈中没有任何内容。

我认为这意味着它不会卡AsyncDownloadString在 parseWikiAsync 的任何地方或任何地方。还有什么可能导致这种情况?

哦,另外,异步代码实际开始之前大约需要整整一分钟。之后,它以相当合理的速度运行,直到它再次无限期挂起。

这是主线程的调用堆栈:

>   mscorlib.dll!System.Threading.WaitHandle.InternalWaitOne(System.Runtime.InteropServices.SafeHandle waitableSafeHandle, long millisecondsTimeout, bool hasThreadAffinity, bool exitContext) + 0x22 bytes 
    mscorlib.dll!System.Threading.WaitHandle.WaitOne(int millisecondsTimeout, bool exitContext) + 0x28 bytes    
    FSharp.Core.dll!Microsoft.FSharp.Control.AsyncImpl.ResultCell<Microsoft.FSharp.Control.AsyncBuilderImpl.Result<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>>.TryWaitForResultSynchronously(Microsoft.FSharp.Core.FSharpOption<int> timeout) + 0x36 bytes  
    FSharp.Core.dll!Microsoft.FSharp.Control.CancellationTokenOps.RunSynchronously<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>(System.Threading.CancellationToken token, Microsoft.FSharp.Control.FSharpAsync<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]> computation, Microsoft.FSharp.Core.FSharpOption<int> timeout) + 0x1ba bytes 
    FSharp.Core.dll!Microsoft.FSharp.Control.FSharpAsync.RunSynchronously<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>(Microsoft.FSharp.Control.FSharpAsync<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]> computation, Microsoft.FSharp.Core.FSharpOption<int> timeout, Microsoft.FSharp.Core.FSharpOption<System.Threading.CancellationToken> cancellationToken) + 0xb9 bytes   
    WikiSurvey.exe!<StartupCode$WikiSurvey>.$Program.main@() Line 97 + 0x55 bytes   F#
4

2 回答 2

8

在这里不应该责怪维基百科,这是Async.Parallel内部运作方式的结果。的类型签名Async.Parallelseq<Async<'T>> -> Async<'T[]>. 它返回一个包含序列中所有结果的单个 Async 值——因此在返回中的所有计算之前它不会seq<Async<'T>>返回。

为了说明,我修改了您的代码,以便跟踪未完成请求的数量,即已发送到服务器但尚未接收/解析响应的请求。

open Microsoft.FSharp.Control
open Microsoft.FSharp.Control.WebExtensions
open System
open System.Net
open System.Threading

type WebClientWithTimeout() =
    inherit WebClient()

    let mutable timeout = -1
    member __.Timeout
        with get () = timeout
        and set value = timeout <- value

    override x.GetWebRequest uri =
        let r = base.GetWebRequest(uri)
        r.Timeout <- x.Timeout
        r

type ParsedDoc = ParsedDoc
type ParsedArticle = ParsedArticle

let parseDoc (str : string) = ParsedDoc
let parseArticle (doc : ParsedDoc) = Some ParsedArticle

/// A synchronized wrapper around Console.Out so we don't
/// get garbled console output.
let synchedOut =
    System.Console.Out
    |> System.IO.TextWriter.Synchronized

let parseWikiAsync(url : string, outstandingRequestCount : int ref) =
    async {
    use wc = new WebClientWithTimeout(Timeout = 5000)
    wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")

    // Increment the outstanding request count just before we send the request.
    do
        // NOTE : The message must be created THEN passed to synchedOut.WriteLine --
        // piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
        // to be created which somehow defeats the synchronization and garbles the output.
        let msg =
            Interlocked.Increment outstandingRequestCount
            |> sprintf "Outstanding requests: %i"
        synchedOut.WriteLine msg

    let! html = wc.AsyncDownloadString(Uri(url))
    let ret =
        try html |> parseDoc |> parseArticle
        with ex ->
            let msg = sprintf "%A" ex
            synchedOut.WriteLine msg
            None

    // Decrement the outstanding request count now that we've
    // received a reponse and parsed it.
    do
        let msg =
            Interlocked.Decrement outstandingRequestCount
            |> sprintf "Outstanding requests: %i"
        synchedOut.WriteLine msg

    return ret
    }

/// Writes a message to the console, passing a value through
/// so it can be used within a function pipeline.
let inline passThruWithMessage (msg : string) value =
    Console.WriteLine msg
    value

let en100 =
    let outstandingRequestCount = ref 0
    seq { for _ in 1..120 ->
            parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", outstandingRequestCount) }
    |> Async.Parallel
    |> Async.RunSynchronously
    |> passThruWithMessage "Finished running all of the requests."
    |> Seq.choose id
    |> Seq.take 100

如果您编译并运行该代码,您将看到如下输出:

Outstanding requests: 4
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 3
Outstanding requests: 5
Outstanding requests: 6
Outstanding requests: 7
Outstanding requests: 8
Outstanding requests: 9
Outstanding requests: 10
Outstanding requests: 12
Outstanding requests: 14
Outstanding requests: 15
Outstanding requests: 16
Outstanding requests: 17
Outstanding requests: 18
Outstanding requests: 13
Outstanding requests: 19
Outstanding requests: 20
Outstanding requests: 24
Outstanding requests: 22
Outstanding requests: 26
Outstanding requests: 27
Outstanding requests: 28
Outstanding requests: 29
Outstanding requests: 30
Outstanding requests: 25
Outstanding requests: 21
Outstanding requests: 23
Outstanding requests: 11
Outstanding requests: 29
Outstanding requests: 28
Outstanding requests: 27
Outstanding requests: 26
Outstanding requests: 25
Outstanding requests: 24
Outstanding requests: 23
Outstanding requests: 22
Outstanding requests: 21
Outstanding requests: 20
Outstanding requests: 19
Outstanding requests: 18
Outstanding requests: 17
Outstanding requests: 16
Outstanding requests: 15
Outstanding requests: 14
Outstanding requests: 13
Outstanding requests: 12
Outstanding requests: 11
Outstanding requests: 10
Outstanding requests: 9
Outstanding requests: 8
Outstanding requests: 7
Outstanding requests: 6
Outstanding requests: 5
Outstanding requests: 4
Outstanding requests: 3
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 0
Finished running all of the requests.

如您所见,所有请求都是在解析之前发出的——因此,如果您的连接速度较慢,或者您正在尝试检索大量文档,则服务器可能会断开连接,因为它可能会假设您没有检索它试图发送的响应。代码的另一个问题是您需要在 中明确指定要生成的元素数量seq,这会降低代码的可重用性。

更好的解决方案是在某些消费代码需要时检索和解析页面。(如果你想一想,这正是 F#seq的优点。)我们将首先创建一个函数,该函数接受一个 Uri 并产生一个seq<Async<'T>>- 即,它产生一个无限的Async<'T>值序列,每个值都将检索来自 Uri 的内容,对其进行解析并返回结果。

/// Given a Uri, creates an infinite sequence of whose elements are retrieved
/// from the Uri.
let createDocumentSeq (uri : System.Uri) =
    #if DEBUG
    let outstandingRequestCount = ref 0
    #endif

    Seq.initInfinite <| fun _ ->
        async {
        use wc = new WebClientWithTimeout(Timeout = 5000)
        wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")

        #if DEBUG
        // Increment the outstanding request count just before we send the request.
        do
            // NOTE : The message must be created THEN passed to synchedOut.WriteLine --
            // piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
            // to be created which somehow defeats the synchronization and garbles the output.
            let msg =
                Interlocked.Increment outstandingRequestCount
                |> sprintf "Outstanding requests: %i"
            synchedOut.WriteLine msg
        #endif

        let! html = wc.AsyncDownloadString uri
        let ret =
            try Some html
            with ex ->
                let msg = sprintf "%A" ex
                synchedOut.WriteLine msg
                None

        #if DEBUG
        // Decrement the outstanding request count now that we've
        // received a reponse and parsed it.
        do
            let msg =
                Interlocked.Decrement outstandingRequestCount
                |> sprintf "Outstanding requests: %i"
            synchedOut.WriteLine msg
        #endif

        return ret
        }

现在我们使用此函数将页面作为流检索:

//
let en100_Streaming =
    #if DEBUG
    let documentCount = ref 0
    #endif

    Uri ("http://en.wikipedia.org/wiki/Special:Random")
    |> createDocumentSeq
    |> Seq.choose (fun asyncDoc ->
        Async.RunSynchronously asyncDoc
        |> Option.bind (parseDoc >> parseArticle))
    #if DEBUG
    |> Seq.map (fun x ->
        let msg =
            Interlocked.Increment documentCount
            |> sprintf "Parsed documents: %i"
        synchedOut.WriteLine msg
        x)
    #endif
    |> Seq.take 50
    // None of the computations actually take place until
    // this point, because Seq.toArray forces evaluation of the sequence.
    |> Seq.toArray

如果您运行该代码,您会看到它一次从服务器中提取一个结果,并且不会留下未完成的请求。此外,更改要检索的结果数量非常容易——您需要做的就是更改传递给Seq.take.

现在,虽然流式代码工作得很好,但它不会并行执行请求,因此对于大量文档可能会很慢。这是一个很容易解决的问题,尽管解决方案可能有点不直观。与其尝试并行执行整个请求序列——这是原始代码中的问题——让我们创建一个函数,用于并行Async.Parallel执行批量请求,然后Seq.collect将结果组合回一个平面序列.

/// Given a sequence of Async<'T>, creates a new sequence whose elements
/// are computed in batches of a specified size.
let parallelBatch batchSize (sequence : seq<Async<'T>>) =
    sequence
    |> Seq.windowed batchSize
    |> Seq.collect (fun batch ->
        batch
        |> Async.Parallel
        |> Async.RunSynchronously)

要使用这个功能,我们只需要对流媒体版本的代码进行一些小的调整:

let en100_Batched =
    let batchSize = 10
    #if DEBUG
    let documentCount = ref 0
    #endif

    Uri ("http://en.wikipedia.org/wiki/Special:Random")
    |> createDocumentSeq
    // Execute batches in parallel
    |> parallelBatch batchSize
    |> Seq.choose (Option.bind (parseDoc >> parseArticle))
    #if DEBUG
    |> Seq.map (fun x ->
        let msg =
            Interlocked.Increment documentCount
            |> sprintf "Parsed documents: %i"
        synchedOut.WriteLine msg
        x)
    #endif
    |> Seq.take 50
    // None of the computations actually take place until
    // this point, because Seq.toArray forces evaluation of the sequence.
    |> Seq.toArray

同样,更改要检索的文档数量很容易,并且可以轻松修改批量大小(再次,我建议您将其保持在合理的小范围内)。如果您愿意,您可以对“流式处理”和“批处理”代码进行一些调整,以便在运行时在它们之间切换。

最后一件事——使用我的代码,请求不应该超时,因此您可能可以摆脱WebClientWithTimeout该类并直接使用WebClient

于 2012-08-30T18:26:47.550 回答
2

您的代码似乎没有做任何特别的事情,所以我假设维基百科不喜欢您的活动。看看他们的机器人政策。再深入一点,他们似乎也有严格的用户代理政策

自 2010 年 2 月 15 日起,维基媒体站点要求所有请求的 HTTP 用户代理标头。这是技术人员做出的操作性决定,并在技术邮件列表中公布和讨论。[1][2] 理由是,不发送 User-Agent 字符串的客户端大多是行为不端的脚本,会导致服务器上的大量负载,而不会使项目受益。请注意,用户代理字符串的非描述性默认值(例如 Perl 的 libwww 使用的)也可能会被阻止使用 Wikimedia 网站(或部分网站,例如 api.php)。

不发送 User-Agent 标头的用户代理(浏览器或脚本)现在可能会遇到如下错误消息:

脚本应使用包含联系信息的用户代理字符串,否则它们可能会被 IP 阻止,恕不另行通知。

因此,根据我的发现,即使您添加了适当的用户代理,他们也可能不喜欢您正在做的事情,但您不妨试一试。

wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")

避免与他们的服务器建立如此多的连接也没有什么坏处。

于 2012-08-29T04:12:34.457 回答