2

我正在尝试解决以下问题。我有一些实时运行的代理,具有几毫秒的大心跳,因此它们处理的操作顺序大多是确定性的(因为消息处理不是瓶颈)。

现在,我正在对不再有心跳的系统进行大量模拟(否则将需要几个世纪)——但我需要确保操作顺序得以保留。为此,我采用了以下解决方案:模拟器确保每个代理都处理了他的消息队列,方法是发布一个虚拟同步消息并在等待答案时阻塞。这确实适用于我的应用程序,但它所花费的时间并不直观 - 因为单线程实现会快一个数量级(我猜 - x 100 ish - 虽然我没有测试过)。

我已经隔离了一个显示问题的小测试,甚至尝试使用另一个库 akka.net

type Greet = 
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2  

[<EntryPoint>]
let main argv =
    let system = System.create "MySystem" <| Configuration.load()    
    let greeter = spawn system "greeter" <| fun mailbox ->
        let rec loop() = actor {
            let! msg = mailbox.Receive()
            let sender = mailbox.Sender()
            match msg with
                | Greet who -> () // printf "Hello, %s!\n" who
                | Hello2 -> sender.Tell(true)
                | _ -> ()
            return! loop()
            }
        loop()

    let greeterF =
        MailboxProcessor.Start
            (fun inbox ->                
                async {
                    while true do
                        let! msg = inbox.Receive()
                        match msg with
                        | Greet who -> () // printf "Hello, %s!\n" who
                        | Hello reply -> reply.Reply true
                        | _ -> ()
                    }
            )

    let n = 1000000

    let t1 = System.Diagnostics.Stopwatch()
    t1.Start()
    for i = 1 to n do
        let rep = greeterF.PostAndReply(fun reply -> (Hello reply)) |> ignore
        ()

    printfn "elapsed Mailbox:%A" t1.ElapsedMilliseconds

    t1.Restart()

    for i = 1 to n do        
        let res = greeter.Ask (Hello2)
        let rr = res.Result
        ()

    printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
    System.Console.ReadLine () |> ignore

    0

基本上,对于仅仅 100 万次同步,两者都需要大约 10 秒 - 而不是计算所涉及的任何内容,这是......不幸的。

我想知道是否有人遇到过同样的问题,是否有任何方法可以关闭开销,强制所有内容以单线程模式运行......这比停用所有 cpu 更好,但在 bios 中只有 1 - 或在没有代理的情况下编写整个系统的克隆。

任何帮助都非常感谢。

4

3 回答 3

4

Akka.NET 版本在这里变慢的原因是您与演员的沟通方式:

main process    Task     FutureActorRef  !!ThreadPool!!   greeter
    Ask ---------------------->
                              Tell-----------> 
                                             MailboxRun ----->
                                 (greeter mailbox is empty)  |                 
                               <--------------------------Tell 
                  <--Complete task
    <----------.Result
  1. 对于每次迭代,都会创建一个 TPL 任务

  2. 然后向迎宾员发送一条消息

  3. 然后主进程在等待响应返回时阻塞。

  4. 迎宾员回复,这反过来又完成了里面的任务FutureActorRef

冲洗并重复.. 这种设计将导致 Akka.NET 为每条消息启动和停止欢迎程序“邮箱运行”,因为邮箱队列在每次迭代时变为空。这会导致每个传递的单个消息的线程池调度。

这有点像进入您的汽车,将踏板踩到金属上,然后突然停下并走出汽车,然后再次重复该过程。这不是快速旅行的一种非常有效的方式。

@Aaronontheweb 的建议只有在您解决代码中的上述问题时才会生效。邮箱需要能够不断地从内部队列中挑选消息来批量处理消息以实现全吞吐量。

相反,将生产者与消费者分开。创建一个 Actor 来监听来自欢迎者的响应。一旦该参与者处理了您的 1000000 条消息,让该参与者将 WorkCompleted 消息发送回消费者。

[编辑] 我自己试了一下,我不知道 F#,所以它可能不完全地道:)

open Akka
open Akka.Actor
open Akka.FSharp

type Greet = 
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2 

type Consume =
| Response
| SetSender

[<EntryPoint>]
let main argv =

    let system = System.create "MySystem" <| Configuration.load()    
    let greeter = spawn system "greeter" <| fun mailbox ->
        let rec loop() = actor {
            let! msg = mailbox.Receive()
            let sender = mailbox.Sender()
            match msg with
                | Greet who -> () // printf "Hello, %s!\n" who
                | Hello2 -> sender.Tell(Response)
                | _ -> ()
            return! loop()
            }
        loop()

    let consumer = spawn system "consumer" <| fun mailbox ->
        let rec loop(count,sender : IActorRef) = actor {
            if count = 1000000 then sender.Tell(true)
            let! msg = mailbox.Receive()
            match msg with
            | Response -> return! loop(count+1,sender)
            | SetSender -> return! loop(count,mailbox.Sender())

        }  
        loop(0,null)      

    let n = 1000000

    let t1 = System.Diagnostics.Stopwatch()
    t1.Start()   
    for i = 1 to n do        
        greeter.Tell(Hello2,consumer)

    let workdone = consumer.Ask SetSender
    workdone.Wait()

    printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
    System.Console.ReadLine () |> ignore

    0

我更新了您的代码以使用单独的使用者来处理参与者响应,然后在处理完所有回复后回复。

通过这样做,您在我的机器上的处理时间现在降至 650 毫秒。

如果你想要更好的吞吐量,你需要让更多的参与者参与进来,以实现更多的并行化。

我不确定这是否有助于您的特定情况

于 2015-04-15T23:53:51.823 回答
2

这是一个稍微修改的MailboxProcessor版本:

module MBPAsync =
  type Greet = 
   | Greet of string
   | Hello of AsyncReplyChannel<bool>

  let run n =
    let timer = Stopwatch.StartNew ()

    use greeter =
      MailboxProcessor.Start <| fun inbox -> async {
        while true do
          let! msg = inbox.Receive()
          match msg with
           | Greet who -> () // printf "Hello, %s!\n" who
           | Hello reply -> reply.Reply true
      }

    Async.RunSynchronously <| async {
      for i = 1 to n do
        do! Async.Ignore (greeter.PostAndAsyncReply Hello)
    }

    let elapsed = timer.Elapsed
    printfn "%A" elapsed

此处的不同之处在于,此版本使用PostAndAsyncReply并将计算保持在异步工作流程中。在我的快速测试中,这似乎比使用快得多PostAndReply,但是 YMMV.

我从上面的 MBP 版本中得到的时间大致是这样的:

> MBPAsync.run 1000000 ;;
00:00:02.6883486
val it : unit = ()

之前的评论提到了我的 Hopac 库。这是使用 Hopac 的优化版本:

module Hop =
  type Greet = 
   | Greet of string
   | Hello of IVar<bool>

  let run n =
    let timer = Stopwatch.StartNew ()

    let greeterCh = ch ()
    do greeterCh >>= function
          | Greet who -> Job.unit ()
          | Hello reply -> reply <-= true
       |> Job.forever
       |> server

    Job.forUpToIgnore 1 n <| fun _ ->
        let reply = ivar ()
        greeterCh <-- Hello reply >>.
        reply
    |> run

    let elapsed = timer.Elapsed
    printfn "%A" elapsed

我从上面的 Hopac 版本中得到的时间大致是这样的:

> Hop.run 1000000 ;;
00:00:00.1088768
val it : unit = ()
于 2015-04-16T05:09:11.750 回答
1

我不是 F# 开发人员,但我是 Akka.NET 的核心开发人员。针对您的场景的一些想法:

  1. 如果你只使用一个演员来完成这项工作,你可以尝试使用PinnedDispatcher- 这样演员一直在自己的专用线程上运行。这将为您节省不必要的上下文切换开销。

  2. 为此,您还可以将邮箱的吞吐量设置为PinnedDispatcher比正常设置高得多。即,将吞吐量值设置为 10000(或其他值)而不是正常的 25。假设您的邮箱内容大量增长,这应该可以为您节省邮箱同步开销。

您的调度程序配置可能如下所示:

 my-pinned-dispatcher {
      type = PinnedDispatcher
      throughput = 1000 #your mileage may vary
 }

然后配置一个actor来使用它

C# 流畅的接口

var myActor = myActorSystem.ActorOf(Props.Create<FooActor>()
.WithDispatcher("my-pinned-dispatcher");

配置

akka.actor.deployment{
   /greeter{
     dispatcher = my-pinned-dispatcher
   }
}

这两个选项都可以通过 App.config 或 Web.config 中的 HOCON 进行配置,或者您可以使用Props类上的流利界面来执行此操作。另外值得注意的是:目前固定调度程序存在一个错误,但这应该会在我们下周发布的下一个维护版本(v1.0.1)中修复。

您的里程可能会有所不同,但这是我会尝试的 - 基本上它只是旨在帮助减少围绕单个演员的争用和开销。

于 2015-04-15T20:20:23.117 回答