3

我正在玩写一个非常简单的异步测试框架之类的东西。但我认为我遇到了某种限制或错误。抱歉,我无法在较小的代码库上重现它。

这是我提出的基本框架:

module TestRunner
    open System

    type TestOptions = {
        Writer : ConsoleColor -> string -> unit}
    type TestResults = {
        Time : TimeSpan
        Failure : exn option
        }
    type Test = {
        Name : string
        Finished : IEvent<TestResults>
        SetFinished : TestResults -> unit
        TestFunc : TestOptions -> Async<TestResults> }

    let createTest name f =  
        let ev = new Event<TestResults>()
        {
            Name = name 
            Finished = ev.Publish
            SetFinished = (fun res -> ev.Trigger res)
            TestFunc = 
                (fun options -> async {
                    let watch = System.Diagnostics.Stopwatch.StartNew()
                    try
                        do! f options
                        watch.Stop()
                        return { Failure = None; Time = watch.Elapsed }
                    with exn ->
                        watch.Stop()
                        return { Failure = Some exn; Time = watch.Elapsed }
                    })}

    let simpleTest name f = 
        createTest name (fun options -> f options.Writer)

    /// Create a new Test and change the result
    let mapResult mapping test = 
        { test with
            TestFunc = 
                (fun options -> async {
                    let! result = test.TestFunc options
                    return mapping result})}

    let writeConsole color f = 
        let old = System.Console.ForegroundColor
        try
            System.Console.ForegroundColor <- color
            f()
        finally
            System.Console.ForegroundColor <- old

    let printColor color (text:String) = 
        writeConsole color (fun _ -> Console.WriteLine(text))


    type WriterMessage = 
        | NormalWrite of ConsoleColor * String
        | StartTask of AsyncReplyChannel<int> * String
        | WriteMessage of int * ConsoleColor * String
        | EndTask of int

    /// will handle printing jobs for two reasons
    /// 1. Nice output grouped by tests (StartTask,WriteMessage,EndTask)
    /// 2. Print Summary after all tests finished (NormalWrite)
    let writer = MailboxProcessor.Start (fun inbox -> 
        let currentTask = ref 0
        let newHandle (returnHandle:AsyncReplyChannel<int>) = 
            let handle = System.Threading.Interlocked.Increment currentTask
            returnHandle.Reply handle
            handle 

        // the tasks describe which tasks are currently waiting to be processed
        let rec loop tasks = async {
            let! newTasks =
                match tasks with
                /// We process the Task with the number t and the name name
                | (t, name) :: next -> 
                    inbox.Scan
                        (fun msg -> 
                            match msg with
                            | EndTask (endTask) -> 
                                // if the message is from the current task finish it
                                if t = endTask then
                                    Some (async { return next })
                                else None
                            | WriteMessage(writeTask, color, message) ->
                                if writeTask = t then 
                                    Some (async {
                                        printColor color (sprintf "Task %s: %s" name message)
                                        return tasks
                                    })
                                else None
                            | StartTask (returnHandle, name) -> 
                                // Start any tasks instantly and add them to the list (because otherwise they would just wait for the resonse)
                                Some (async { 
                                    let handle = newHandle returnHandle
                                    return (List.append tasks [handle, name]) })
                            | _ -> None)
                // No Current Tasks so just start ones or process the NormalWrite messages
                | [] ->
                    inbox.Scan     
                        (fun msg -> 
                            match msg with
                            | StartTask (returnHandle, name) -> 
                                Some (async { 
                                    let handle = newHandle returnHandle
                                    return [handle, name] })
                            | NormalWrite(color, message) ->
                                Some (async {
                                    printColor color message
                                    return []
                                })
                            | _ -> None)   

            return! loop newTasks 
        }
        loop [])

    /// Write a normal message via writer
    let writerWrite color (text:String) = 
        writer.Post(NormalWrite(color, text))

    /// A wrapper around the communication (to not miss EndTask for a StartTask)
    let createTestWriter name f = async {
        let! handle = writer.PostAndAsyncReply(fun reply -> StartTask(reply, name))
        try
            let writer color s = 
                writer.Post(WriteMessage(handle,color,s))
            return! f(writer)
        finally
            writer.Post (EndTask(handle))
        }
    /// Run the given test and print the results
    let testRun t = async {
        let! results = createTestWriter t.Name (fun writer -> async {
            writer ConsoleColor.Green (sprintf "started")
            let! results = t.TestFunc { Writer = writer }
            match results.Failure with
            | Some exn -> 
                writer ConsoleColor.Red (sprintf "failed with %O" exn)
            | None ->
                writer ConsoleColor.Green (sprintf "succeeded!")
            return results}) 
        t.SetFinished results
        }
    /// Start the given task with the given amount of workers
    let startParallelMailbox workerNum f = 
        MailboxProcessor.Start(fun inbox ->
            let workers = Array.init workerNum (fun _ -> MailboxProcessor.Start f)
            let rec loop currentNum = async {
                let! msg = inbox.Receive()
                workers.[currentNum].Post msg
                return! loop ((currentNum + 1) % workerNum)
            }
            loop 0 )
    /// Runs all posted Tasks
    let testRunner = 
        startParallelMailbox 10 (fun inbox ->
            let rec loop () = async {
                let! test = inbox.Receive()
                do! testRun test
                return! loop()
            }
            loop ())
    /// Start the given tests and print a sumary at the end
    let startTests tests = async {
        let! results =
            tests 
                |> Seq.map (fun t ->
                    let waiter = t.Finished |> Async.AwaitEvent
                    testRunner.Post t
                    waiter
                   )
                |> Async.Parallel
        let testTime = 
            results
                |> Seq.map (fun res -> res.Time)
                |> Seq.fold (fun state item -> state + item) TimeSpan.Zero
        let failed = 
            results
                |> Seq.map (fun res -> res.Failure) 
                |> Seq.filter (fun o -> o.IsSome)
                |> Seq.length
        let testCount = results.Length
        if failed > 0 then
            writerWrite ConsoleColor.DarkRed (sprintf "--- %d of %d TESTS FAILED (%A) ---" failed testCount testTime)
        else
            writerWrite ConsoleColor.DarkGray (sprintf "--- %d TESTS FINISHED SUCCESFULLY (%A) ---" testCount testTime)
        }

现在,仅当我使用一组特定的测试在网络上进行一些爬网时才会触发异常(有些失败,有些则没有,这很好):

#r @"Yaaf.GameMediaManager.Primitives.dll";; // See below
open TestRunner

let testLink link =
    Yaaf.GameMediaManager.EslGrabber.getMatchMembers link
    |> Async.Ignore

let tests = [
    // Some working links (links that should work)
    yield! 
      [ //"TestMatch", "http://www.esl.eu/eu/wire/anti-cheat/css/anticheat_test/match/26077222/"
        "MatchwithCheater", "http://www.esl.eu/de/csgo/ui/versus/match/3035028"
        "DeletedAccount", "http://www.esl.eu/de/css/ui/versus/match/2852106" 
        "CS1.6", "http://www.esl.eu/de/cs/ui/versus/match/2997440" 
        "2on2Versus", "http://www.esl.eu/de/css/ui/versus/match/3012767" 
        "SC2cup1on1", "http://www.esl.eu/eu/sc2/go4sc2/cup230/match/26964055/"
        "CSGO2on2Cup", "http://www.esl.eu/de/csgo/cups/2on2/season_08/match/26854846/"
        "CSSAwpCup", "http://www.esl.eu/eu/css/cups/2on2/awp_cup_11/match/26811005/"
        ] |> Seq.map (fun (name, workingLink) -> simpleTest (sprintf "TestEslMatches_%s" name) (fun o -> testLink workingLink))
    ]

startTests tests |> Async.Start;; // this will produce the Exception now and then

https://github.com/matthid/Yaaf.GameMediaManager/blob/core/src/Yaaf.GameMediaManager.Primitives/EslGrabber.fs是代码,你可以下载https://github.com/downloads/matthid/Yaaf。 GameMediaManager/GameMediaManager.%200.9.3.1.wireplugin(这基本上是一个重命名的 zip 存档)并将其解压缩以获取 Yaaf.GameMediaManager.Primitives.dll 二进制文件(您可以将其粘贴到 FSI 中,而不是在需要时下载,但随后您就有了引用 HtmlAgilityPack)

我可以使用 Microsoft (R) F# 2.0 Interactive, Build 4.0.40219.1 重现这一点。问题是异常不会总是(但经常)被触发,并且堆栈跟踪什么也没告诉我

System.Exception: multiple waiting reader continuations for mailbox
   bei <StartupCode$FSharp-Core>.$Control.-ctor@1860-3.Invoke(AsyncParams`1 _arg11)
   bei <StartupCode$FSharp-Core>.$Control.loop@413-40(Trampoline this, FSharpFunc`2 action)
   bei Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   bei Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   bei <StartupCode$FSharp-Core>.$Control.finishTask@1280[T](AsyncParams`1 _arg3, AsyncParamsAux aux, FSharpRef`1 firstExn, T[] results, TrampolineHolder trampolineHolder, Int32 remaining)
   bei <StartupCode$FSharp-Core>.$Control.recordFailure@1302[T](AsyncParams`1 _arg3, AsyncParamsAux aux, FSharpRef`1 count, FSharpRef`1 firstExn, T[] results, LinkedSubSource innerCTS, TrampolineHolder trampolineHolder, FSharpChoice`2 exn)
   bei <StartupCode$FSharp-Core>.$Control.Parallel@1322-3.Invoke(Exception exn)
   bei Microsoft.FSharp.Control.AsyncBuilderImpl.protectedPrimitive@690.Invoke(AsyncParams`1 args)
   bei <StartupCode$FSharp-Core>.$Control.loop@413-40(Trampoline this, FSharpFunc`2 action)
   bei Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   bei Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   bei <StartupCode$FSharp-Core>.$Control.-ctor@473-1.Invoke(Object state)
   bei System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   bei System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
   bei System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   bei System.Threading.ThreadPoolWorkQueue.Dispatch()
   bei System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

因为这将在我无法控制的工作线程上触发,所以这会使应用程序崩溃(不是 FSI,但这里也会显示异常)。

我找到了http://cs.hubfs.net/topic/Some/2/59152http://cs.hubfs.net/topic/None/59146但我不使用 StartChild 我不认为我是以某种方式同时从多个线程调用接收?

我的代码有什么问题还是这确实是一个错误?如果可能的话,我该如何解决这个问题?

我注意到在 FSI 中,当 Exception 被静默忽略时,所有测试都将按预期运行。我该怎么做?

编辑:我在修复失败的单元测试后注意到它会正常工作。但是我仍然不能用更小的代码库来重现这个。例如,我自己的失败测试。

谢谢,马蒂德

4

4 回答 4

2

我的感觉是限制将在 MailboxProcessor 本身而不是异步中。

老实说,我会谨慎使用扫描功能。我写了一篇关于使用它们的危险的博客文章。

是否可以使用标准接收机制而不是使用扫描功能来处理任务?

请注意,在异步内部使用了蹦床,以便在一定时间内重复使用同一线程以避免不必要的线程池使用(我认为这设置为 300),因此在调试时您可能会看到这种行为。

我会稍微不同地处理这个问题,将单独的组件分解为管道阶段而不是嵌套的异步块。我会创建一个主管组件和路由组件。

Supervisor 将负责初始测试并将消息发布到路由组件,该路由组件会将请求轮询到其他代理。任务完成后,他们可以发回给主管。

我意识到这并不能真正帮助解决当前代码中的问题,但我认为无论如何您都必须分解问题才能调试系统的异步部分。

于 2012-10-03T06:56:08.890 回答
0

Scan我确实相信/ TryScan/的 2.0 实现中存在一个错误Receive,可能会虚假地导致

multiple waiting reader continuations for mailbox 

例外; 我认为该错误现在已在 3.0 实现中得到修复。我没有仔细查看您的代码,以确保您在实现中一次只尝试接收一条消息,因此这也可能是您的代码中的错误。如果您可以针对 F# 3.0 进行尝试,很高兴知道这是否会消失。

于 2012-09-28T17:31:06.427 回答
0

一些笔记,以防有人发现我的经验有用(为了找到问题需要很长时间调试多个进程):

仅 50 个代理/邮箱就开始阻塞执行和吞吐量。有时在负载较轻的情况下,它可以处理第一轮消息,但任何像调用日志库这样重要的事情都会触发更长的延迟。

使用 VS IDE 中的 Threads/Parallel Stacks 窗口进行调试,运行时正在等待由 Trampoline.ExecuteAction 调用的 FSharpAsync.RunSynchronously -> CancellationTokenOps.RunSynchronously 调用的结果

我怀疑底层的 ThreadPool 正在限制启动(在第一次似乎运行正常之后)。这是一个很长的延迟。我正在使用代理在某些队列中进行次要计算进行序列化,同时允许主调度代理保持响应,因此延迟在 CLR 中的某个位置。

我发现在 try-with 中运行带有超时的 MailboxProcessor Receive 可以停止延迟,但这需要包含在异步块中以阻止程序的其余部分变慢,但延迟时间很短。尽管有点手忙脚乱,但对 F# MailboxProcessor 用于实现 actor 模型感到非常满意。

于 2013-09-19T15:20:34.623 回答
0

Sadly I never actually could reproduce this on a smaller code base, and now I would use NUnit with async test support instead of my own implementation. I used agents (MailboxProcessor) and asyncs in various projects since them and never encountered this again...

于 2015-01-01T20:55:04.000 回答