4

我不确定我是否应该在这里或Hopac Wiki上问这个问题,或者这更像是一个设计问题,但这里有。我最近正在研究 F# 代理以使用与我的 Rx 相关的编码并遇到以下代码如何使用 MailboxProcessor 创建作业队列?并想,嘿,这不是最适合Hopac的东西吗?

现在我有几个关于如何实际实现可以在 Hopac 中启动和暂停的“作业队列”的问题(如在作业队列 post中)。

  • 问题:如何维护Hopac.Jobs队列,并能够在需要时暂停处理作业,或者也可以清除它?我可以像在作业队列帖子中那样实现一个显式队列),但除此之外,这个问题的一个可能答案也让我很感兴趣。
  • 然后是一个更一般性建议的问题:我计划制作数千个这样的代理,以便它们从 Rx 流中获取输入,或者每个代理可能有一个流,进行一些处理,然后将结果转发到异步接口(大多数可能它返回一个Taskor Task<T>) 可能需要时间来完成或失败(例如,接口后面有一个数据库)在这种情况下,我可能想要超时并重试。也许这可以通过 Rx 将Job结果排队以输出 Rx 流来更好地完成,但要点是Task基于接口交互、超时和失败时重试。问题:这会是一个好的场景吗?我的目标是在混合 .NET 代码代码库(即 C#、一些 VB.NET 和 F#)中更实际的用例,因此将不胜感激意见和实现点。

我从Post Mailbox 基准示例中得到启发并创建了以下示例(代码的真正要点是 JobProcessor)

//Install-Package Hopac 
open Hopac
open Hopac.Extensions
open Hopac.Infixes
open Hopac.Job.Infixes


[<CustomEquality; CustomComparison>]
type Message<'a> = 
    | Start
    | Stop
    | Pause
    | Job of ('a -> unit)

    override x.Equals(obj) = 
        match obj with
        | :? Message<'a> as fu -> 
            match x, fu with
            | Start, Start | Stop, Stop | Pause, Pause -> true
            | Job f1, Job f2 -> true
            | _, _ -> false
        | _ -> false

    override x.GetHashCode() = 
        match x with
        | Start -> 1
        | Stop -> 2
        | Pause -> 3
        | _ -> 4

    interface System.IComparable with
        member x.CompareTo yobj = 
            match yobj with
            | :? Message<'a> as y -> compare x y
            | _ -> invalidArg "yobj" "cannot compare value of different types"

    interface System.IComparable<Message<'a>> with
        member x.CompareTo(y) = compare x y

type JobProcessor() = 
    let mMb = mb()
    do 
        run <| job { 
                   do! Job.foreverServer (mMb |>> fun msg -> 
                                              //Should an explicit job queue be introduce here?
                                              match msg with
                                              | Start -> printfn "Start"
                                              | Stop ->  printfn "Stop"
                                              | Pause -> printfn "Pause"
                                              | Job(f) ->
                                                printfn "Job"
                                                f())
               }

    member x.QueueJob(f) =
        Mailbox.Global.send mMb f
        //mMb <<-+ Message.Start

[<EntryPoint>]
let main argv = 
    let jp = new JobProcessor()
    let jobFunc i () = (printfn "%i" i)
    jp.QueueJob(Job(jobFunc 100)) 
4

0 回答 0