看看基于actor的模式和MailboxProcessor
基本上你可以把它想象成一个异步队列。如果您使用运行列表(以Async.StartChild或Async.StartAsTask开头)作为 MailboxProcessor 内循环的参数,您可以通过等待或 CancellationToken 优雅地处理关闭)
这是我放在一起的一个快速示例:
type Commands =
| RunJob of Async
| JobDone of int
| Quit of AsyncReplyChannel
type JobRunner() =
let processor =
MailboxProcessor.Start (fun inbox ->
let rec loop (nextId, jobs) = async {
let! cmd = inbox.Receive()
match cmd with
| Quit cb ->
if not (Map.isEmpty jobs)
then async {
do! Async.Sleep 100
inbox.Post (Quit cb)}
|> Async.Start
return! loop (nextId, jobs)
else
cb.Reply()
return ()
| JobDone id ->
return! loop (nextId, jobs |> Map.remove id)
| RunJob job ->
let runJob i = async {
do! job
inbox.Post (JobDone i)
}
let! child = Async.StartChild (runJob nextId)
return! loop (nextId+1, jobs |> Map.add nextId child)
}
loop (0, Map.empty))
member jr.PostJob(job) = processor.Post (RunJob job)
member jr.Quit() = processor.PostAndReply(fun cb -> Quit cb)
let postWaitJob (jobRunner : JobRunner) time =
let job = async {
do! Async.Sleep time
printfn "sleept for %d ms" time }
jobRunner.PostJob job
let testRun() =
let jr = new JobRunner()
printfn "starting jobs..."
[10..-1..1] |> List.iter (fun i -> postWaitJob jr (i*1000))
printfn "sending quit"
jr.Quit()
printfn "done!"
嗯......这里的编辑器有一些问题:当我使用管道返回运算符时它只会杀死很多代码...... grrr
简短说明:如您所见,我总是为内部循环提供下一个空闲作业 ID 和 Id->AsyncChild 作业的映射。(您当然可以实施其他/更好的解决方案 - 在此示例中不需要地图,但您可以使用命令“Cancell JobNr”或任何其他方式扩展)作业完成消息仅用于内部从该地图中删除作业退出只检查映射是否为空 - 如果不需要额外的工作并且邮箱处理器退出(return ()) - 如果它不为空,则启动一个新的 Async-Child,它只等待 100 毫秒,然后重新发送 Quit-Message RunJob相当简单 - 它只是将给定的作业与 JobDone 的帖子链接到 MessabeboxProcessor 并使用更新的值递归调用循环(nextId 是一个向上,并且新的作业映射到旧的 nextId)