4

对象的Cancel成员CancellationTokenSource“传达取消请求”,我认为这意味着它是触发后忘记并且不会等到取消完成(例如,所有异常处理程序都已运行)。这很好,但我需要等到一个未完成的异步完全取消后再创建另一个异步。有没有简单的方法来实现这一点?

4

2 回答 2

4

我认为没有任何直接的方法可以使用 F# 异步库中的标准库函数来做到这一点。当工作流(实际)取消时,我们最接近的操作Async.TryCancelled会运行回调,但是必须手动将回调中的消息发送到启动工作流的代码。

使用事件和我编写的 F# 异步扩展(也包含在 FSharpX 包中)的扩展相对容易解决 - 该扩展GuardedAwaitObservable可用于等待事件的发生(可由某些人立即触发)手术)。

以下Async.StartCancellable方法采用异步工作流并返回Async<Async<unit>>. 当您在外部工作流程上绑定时,它会启动参数(如Async.StartChild),当您在返回的内部工作流程上绑定时,它会取消计算并等待直到实际取消:

open System.Threading

module Async = 
  /// Returns an asynchronous workflow 'Async<Async<unit>>'. When called
  /// using 'let!', it starts the workflow provided as an argument and returns
  /// a token that can be used to cancel the started work - this is an
  /// (asynchronously) blocking operation that waits until the workflow
  /// is actually cancelled 
  let StartCancellable work = async {
    let cts = new CancellationTokenSource()
    // Creates an event used for notification
    let evt = new Event<_>()
    // Wrap the workflow with TryCancelled and notify when cancelled
    Async.Start(Async.TryCancelled(work, ignore >> evt.Trigger), cts.Token)
    // Return a workflow that waits for 'evt' and triggers 'Cancel'
    // after it attaches the event handler (to avoid missing event occurrence)
    let waitForCancel = Async.GuardedAwaitObservable evt.Publish cts.Cancel
    return async.TryFinally(waitForCancel, cts.Dispose) }

编辑按照乔恩的建议将结果包装TryFinally起来处理。CancellationTokenSource我认为这应该足以确保它被正确处理。

这是使用该方法的示例。该loop功能是我用于测试的简单工作流程。其余代码启动它,等待 5.5 秒然后取消它:

/// Sample workflow that repeatedly starts and stops long running operation
let loop = async {
  for i in 0 .. 9999 do
    printfn "Starting: %d" i
    do! Async.Sleep(1000)
    printfn "Done: %d" i }

// Start the 'loop' workflow, wait for 5.5 seconds and then
// cancel it and wait until it finishes current operation  
async { let! cancelToken = Async.StartCancellable(loop)
        printfn "started"
        do! Async.Sleep(5500)
        printfn "cancelling"
        do! cancelToken
        printfn "done" }
|> Async.Start

为了完整起见,带有来自 FSharpX 的必要定义的示例位于 F# snippets 上

于 2012-07-23T10:50:45.597 回答
4

考虑到易于使用的同步原语,这应该不难。我特别喜欢只写一次的“逻辑”变量:

type Logic<'T> =
    new : unit -> Logic<'T>
    member Set : 'T -> unit
    member Await : Async<'T>

很容易包装一个 Async 在完成时设置一个逻辑变量,然后等待它,例如:

type IWork =
    abstract member Cancel : unit -> Async<unit>

let startWork (work: Async<unit>) =
    let v = Logic<unit>()
    let s = new CancellationTokenSource()
    let main = async.TryFinally(work, fun () -> s.Dispose(); v.Set())
    Async.Start(main, s.Token)
    {
        new IWork with
            member this.Cancel() = s.Cancel(); v.Await
    }

逻辑变量的可能实现可能是:

type LogicState<'T> =
    | New
    | Value of 'T
    | Waiting of ('T -> unit)

[<Sealed>]
type Logic<'T>() =
    let lockRoot = obj ()
    let mutable st = New
    let update up =
        let k =
            lock lockRoot <| fun () ->
                let (n, k) = up st
                st <- n
                k
        k ()

    let wait (k: 'T -> unit) =
        update <| function
            | New -> (Waiting k, ignore)
            | Value value as st -> (st, fun () -> k value)
            | Waiting f -> (Waiting (fun x -> f x; k x), ignore)

    let await =
        Async.FromContinuations(fun (ok, _, _) -> wait ok)

    member this.Set<'T>(value: 'T) =
        update <| function
            | New -> (Value value, ignore)
            | Value _ as st -> (st, ignore)
            | Waiting f as st -> (Value value, fun () -> f value)

    member this.Await = await
于 2012-07-23T14:47:14.327 回答