0

我正在尝试运行一些 MailboxProcessor 测试,并且似乎邮箱 Scan() 失败并出现“System.Exception:邮箱的多个等待读者继续”。Async.Start 和 Async.StartImmediate 等会发生这种情况(Async.RunSynchronously 也不会工作,因为在初始客户之后只有一个处理器并且没有客户)。

这是演示代码,它以交互方式工作:

#if INTERACTIVE
#r "../packages/FSharp.Data.2.0.4/lib/net40/FSharp.Data.dll"
#endif
open System
open FSharp.Data
let random = new Random()
let data = FreebaseData.GetDataContext()
let customerNames = data.Commons.Computers.``Computer Scientists``
let nameAmount = customerNames |> Seq.length
// ----

type Customer() =
    let person = customerNames |> Seq.nth (random.Next nameAmount)
    member x.Id = Guid.NewGuid()
    member x.Name = person.Name
    member x.RequiredTime = random.Next(10000)

type Barber(name) =
    member x.Name = name

type ``Possible actions notified to barber`` = 
| CustomerWalksIn of Customer

let availableCustomers = new MailboxProcessor<``Possible actions notified to barber``>(fun c -> async { () })

let createBarber name = 
    Console.WriteLine("Barber " + name + " takes inital nap...")
    let rec cutSomeHairs () = 
        async{
            do! availableCustomers.Scan(function 
                | CustomerWalksIn customer ->
                    async {
                        Console.WriteLine("Barber " + name + " is awake and started cutting " + customer.Name + "'s hair.")
                        // exception also happen with Threading.Thread.Sleep()
                        do! Async.Sleep customer.RequiredTime
                        Console.WriteLine("Barber " + name + " finnished cutting " + customer.Name + "'s hair. Going to sleep now...")
                    } |> Some)
            do! cutSomeHairs ()
            }
    cutSomeHairs() |> Async.StartImmediate

availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)

createBarber "Tuomas";
createBarber "Seppo";

availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)

...运行一段时间后我得到的堆栈跟踪是:

System.Exception: multiple waiting reader continuations for mailbox
   at <StartupCode$FSharp-Core>.$Control.-ctor@2136-3.Invoke(AsyncParams`1 _arg1)
   at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>.$Control.Sleep@1508-1.Invoke(Object state)
   at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.TimerQueueTimer.CallCallback()
   at System.Threading.TimerQueueTimer.Fire()
   at System.Threading.TimerQueue.FireNextTimers()
   at System.Threading.TimerQueue.AppDomainTimerCallback()
Stopped due to error

或相同的没有线程:

System.Exception: multiple waiting reader continuations for mailbox
   at <StartupCode$FSharp-Core>.$Control.-ctor@2136-3.Invoke(AsyncParams`1 _arg1)
   at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>.$Control.-ctor@511.Invoke(Object state)
Stopped due to error
4

2 回答 2

1

an的ReceiveandScan方法MailboxProcessor只能从代理的主体中调用。引用MSDN 文档

此方法用于代理体内。对于每个代理,最多可以有一个并发读取器处于活动状态,因此对 Receive、TryReceive、Scan 或 TryScan 的并发调用最多可能处于活动状态。扫描器函数的主体在其执行期间被锁定,但在异步工作流执行之前该锁定被释放。

因此,您需要以不同的方式构建代码。我没有详细的答案,但听起来我关于使用代理实现阻塞队列的文章在这里可以提供帮助。

于 2014-03-31T15:22:31.120 回答
1

正如 Tomas 已经指出的那样,MailboxProcessor直接只允许单个阅读器,而在异步系统中解决此问题的一种方法是编写自己的队列或邮箱类型。然而,Tomas 的文章没有提到的一件事是,实现新通信原语的另一种方法是使用Async.FromContinuations而不是MailboxProcessorand AsyncReplyChannel

使用的主要优点Async.FromContinuations是您可以更直接地访问异步机制,并且不必在 和 施加的限制内MailboxProcessor工作AsyncReplyChannel。主要缺点是您需要自己使队列或邮箱线程安全。

作为一个具体的例子,A​​nton Tayanovskyy 的博客文章Making Async 5x Faster包含一个使用Async.FromContinuations.

于 2014-07-19T10:23:20.517 回答