1

我有一个 F# 项目,它将一些文件加载​​到外部子系统,然后使用表依赖来等待将某些行添加到表中作为副作用。

下面的类型中使用表依赖来监视数据库更改。当添加/更改/任何行时,它会触发自定义事件:

// just using this type for the RecordChangedEvent to marshal the id we want into something
type AccountLoaded() = 
    let mutable someId = ""

    // this property name matches the name of the table column (SomeId)
    member this.SomeId 
        with get () = someId
        and set (value) = someId <- value

// AccountLoadWatcher
type AccountLoadWatcher() = 
    let mutable _tableDependency = null
    let event = new Event<_>()

    interface IDisposable with
        member this.Dispose() = 
            _tableDependency.Stop()
            _tableDependency.Dispose()

    // custom event we can send when an account is loaded

    [<CLIEvent>]
    member this.AccountLoaded = event.Publish

    member private this.NotifyAccountLoaded(sender : RecordChangedEventArgs<AccountLoaded>) = 
        let accountLoaded = sender.Entity
        event.Trigger(accountLoaded.SomeId)

    member this.Watch() = 
        _tableDependency <- DbLib.getTableDependency "dbo" "AccountTable" 
                                null
        _tableDependency.OnChanged.Add(this.NotifyAccountLoaded)
        _tableDependency.Start()

我想要做的是获取上面的对象,然后等待所有带有我关心的 id 的行被加载。到目前为止,我所拥有的是:

let waitForRows(csvFileRows) =
  let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows

  let mutable collected = Set.empty
  let isInSet id = Set.contains id idsToWaitFor
  let notDone = not <| (Set.difference idsToWaitFor collected = Set.empty)
  let accountLoadedHandler id = 
      collected <- collected.Add id
      printfn "Id loaded %s, waiting for %A\n" id (Set.difference idsToWaitFor collected)
  loadToSubsystem csvFileRows |> ignore

  // wait for all the watcher events; filtering each event object for ids we care about
     watcher.AccountLoaded
           |> Observable.takeWhile (fun _ -> notDone) 
           |> Observable.filter (fun e -> isInSet e) 
           |> Observable.subscribe accountLoadedHandler
           |> ignore

  doMoreWork()

但这只是继续做更多工作,而无需等待我上面需要的所有事件。

我需要使用任务还是异步?F# 代理?

4

1 回答 1

0

鉴于您Observable.takeWhile在示例中使用,我假设您正在使用FSharp.Control.Reactive包装器来访问所有反应式组合器。

您的方法有一些好主意,例如使用takeWhile等到您收集所有 ID,但是使用突变是非常不幸的 - 由于可能的竞争条件,这样做甚至可能不安全。

一个不错的替代方法是使用各种scan函数之一在事件发生时收集状态。您可以使用Observable.scanInit以空集开始并添加所有ID;然后Observable.takeWhile继续接受事件,直到您拥有所有正在等待的 ID。要实际等待(和阻止),您可以使用Observable.wait. 像这样的东西:

let waitForRows(csvFileRows) =
  let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows

  let finalCollectedIDs = 
    watcher.AccountLoaded
    |> Observable.scanInit Set.empty (fun collected id -> Set.add id collected) 
    |> Observable.takeWhile (fun collected -> not (Set.isSubset idsToWaitFor co llected))
    |> Observable.wait

  printfn "Completed. Final collected IDs are: %A" finalCollectedIDs 
于 2018-11-05T23:04:20.847 回答