我不确定我是否应该在这里或Hopac Wiki上问这个问题,或者这更像是一个设计问题,但这里有。我最近正在研究 F# 代理以使用与我的 Rx 相关的编码并遇到以下代码如何使用 MailboxProcessor 创建作业队列?并想,嘿,这不是最适合Hopac的东西吗?
现在我有几个关于如何实际实现可以在 Hopac 中启动和暂停的“作业队列”的问题(如在作业队列 post中)。
- 问题:如何维护Hopac.Jobs队列,并能够在需要时暂停处理作业,或者也可以清除它?我可以像在作业队列帖子中那样实现一个显式队列),但除此之外,这个问题的一个可能答案也让我很感兴趣。
- 然后是一个更一般性建议的问题:我计划制作数千个这样的代理,以便它们从 Rx 流中获取输入,或者每个代理可能有一个流,进行一些处理,然后将结果转发到异步接口(大多数可能它返回一个
Task
orTask<T>
) 可能需要时间来完成或失败(例如,接口后面有一个数据库)在这种情况下,我可能想要超时并重试。也许这可以通过 Rx 将Job
结果排队以输出 Rx 流来更好地完成,但要点是Task
基于接口交互、超时和失败时重试。问题:这会是一个好的场景吗?我的目标是在混合 .NET 代码代码库(即 C#、一些 VB.NET 和 F#)中更实际的用例,因此将不胜感激意见和实现点。
我从Post Mailbox 基准示例中得到启发并创建了以下示例(代码的真正要点是 JobProcessor)
//Install-Package Hopac
open Hopac
open Hopac.Extensions
open Hopac.Infixes
open Hopac.Job.Infixes
[<CustomEquality; CustomComparison>]
type Message<'a> =
| Start
| Stop
| Pause
| Job of ('a -> unit)
override x.Equals(obj) =
match obj with
| :? Message<'a> as fu ->
match x, fu with
| Start, Start | Stop, Stop | Pause, Pause -> true
| Job f1, Job f2 -> true
| _, _ -> false
| _ -> false
override x.GetHashCode() =
match x with
| Start -> 1
| Stop -> 2
| Pause -> 3
| _ -> 4
interface System.IComparable with
member x.CompareTo yobj =
match yobj with
| :? Message<'a> as y -> compare x y
| _ -> invalidArg "yobj" "cannot compare value of different types"
interface System.IComparable<Message<'a>> with
member x.CompareTo(y) = compare x y
type JobProcessor() =
let mMb = mb()
do
run <| job {
do! Job.foreverServer (mMb |>> fun msg ->
//Should an explicit job queue be introduce here?
match msg with
| Start -> printfn "Start"
| Stop -> printfn "Stop"
| Pause -> printfn "Pause"
| Job(f) ->
printfn "Job"
f())
}
member x.QueueJob(f) =
Mailbox.Global.send mMb f
//mMb <<-+ Message.Start
[<EntryPoint>]
let main argv =
let jp = new JobProcessor()
let jobFunc i () = (printfn "%i" i)
jp.QueueJob(Job(jobFunc 100))