1

我的情况类似于以下内容:

let mutable stopped = false

let runAsync() = async {
    while not stopped do
        let! item = fetchItemToProcessAsync
        match item with
        | Some job -> job |> runJobAsync |> Async.Start
        | None -> do! Async.Sleep(1000)
}

let run() = Async.Start runAsync
let stop() =
    stopped <- true

现在,当调用 stop 方法时,我必须停止从数据库中读取更多项目,并等待当前开始的项目完成,然后再从该函数返回。

实现这一目标的最佳方法是什么?我正在考虑使用计数器(使用互锁的 API)并在计数器达到 0 时从停止方法返回。

如果有其他方法可以做到这一点,我将不胜感激。我有一种感觉,我可以在这里使用代理,但我不确定是否有任何可用的方法可以使用代理完成此操作,或者我是否仍需要编写自定义逻辑来确定作业已完成执行。

4

2 回答 2

1

看看基于actor的模式和MailboxProcessor

基本上你可以把它想象成一个异步队列。如果您使用运行列表(以Async.StartChildAsync.StartAsTask开头)作为 MailboxProcessor 内循环的参数,您可以通过等待或 CancellationToken 优雅地处理关闭)

这是我放在一起的一个快速示例:


type Commands = 
    | RunJob of Async
    | JobDone of int
    | Quit of AsyncReplyChannel

type JobRunner() =
    let processor =
        MailboxProcessor.Start (fun inbox ->
            let rec loop (nextId, jobs) = async {
                let! cmd = inbox.Receive()
                match cmd with
                | Quit cb ->
                    if not (Map.isEmpty jobs) 
                    then async {
                            do! Async.Sleep 100
                            inbox.Post (Quit cb)}
                        |> Async.Start
                        return! loop (nextId, jobs)
                    else 
                        cb.Reply()
                        return ()
                | JobDone id ->
                    return! loop (nextId, jobs |> Map.remove id)
                | RunJob job ->
                    let runJob i = async {
                        do! job
                        inbox.Post (JobDone i)
                    }
                    let! child = Async.StartChild (runJob nextId)
                    return! loop (nextId+1, jobs |> Map.add nextId child)
            }
            loop (0, Map.empty))
    member jr.PostJob(job) = processor.Post (RunJob job)
    member jr.Quit() = processor.PostAndReply(fun cb -> Quit cb)

let postWaitJob (jobRunner : JobRunner) time =
    let job = async {
        do! Async.Sleep time
        printfn "sleept for %d ms" time }
    jobRunner.PostJob job

let testRun() =
    let jr = new JobRunner()
    printfn "starting jobs..."
    [10..-1..1] |> List.iter (fun i -> postWaitJob jr (i*1000))
    printfn "sending quit"
    jr.Quit()
    printfn "done!"

嗯......这里的编辑器有一些问题:当我使用管道返回运算符时它只会杀死很多代码...... grrr

简短说明:如您所见,我总是为内部循环提供下一个空闲作业 ID 和 Id->AsyncChild 作业的映射。(您当然可以实施其他/更好的解决方案 - 在此示例中不需要地图,但您可以使用命令“Cancell JobNr”或任何其他方式扩展)作业完成消息仅用于内部从该地图中删除作业退出只检查映射是否为空 - 如果不需要额外的工作并且邮箱处理器退出(return ()) - 如果它不为空,则启动一个新的 Async-Child,它只等待 100 毫秒,然后重新发送 Quit-Message RunJob相当简单 - 它只是将给定的作业与 JobDone 的帖子链接到 MessabeboxProcessor 并使用更新的值递归调用循环(nextId 是一个向上,并且新的作业映射到旧的 nextId)

于 2011-08-18T06:45:21.287 回答
1

在 fssnip.net 上查看此代码段。这是您可以使用的通用作业处理器。

于 2011-08-18T13:07:21.647 回答