0

我正在努力寻找正确的方法来处理 F# 中运行时间更长的请求/作业。

要求:

  • 一个作业由多个步骤组成(需要按顺序执行)。
  • 一项工作可能需要几分钟,比如说最多 10 分钟。
  • 一个步骤可能涉及 IO 操作和等待时间,例如,直到由该步骤创建的文件被其他应用程序处理然后返回。
  • 可能是某个步骤失败或达到了作业应提前结束的状态。
  • 应该可以并行处理多个作业。
  • 作业由用户请求启动/添加。
  • 我希望能够根据请求跟踪作业的状态(当前步骤、先前步骤的结果)。

当前解决方案:

目前,我使用 FileSystemWatcher 来监视带有作业请求的“收件箱”。请求会导致将作业添加到由代理 (MailboxProcessor) 管理的列表中。一旦将作业添加到列表中,就会启动一个新线程(let t = new Thread(...) -> t.Start())并且对线程的引用与作业参数一起保存(并在列表中)。在线程中,所有步骤都是按顺序执行的。这样我就可以跟踪作业状态(检查线程是否还活着)并同时处理作业。

但是,这似乎不允许我获取有关作业/线程中步骤的信息。

所需的解决方案:

另外,我想从 FileSystemWatcher 切换到基于 Suave 的 REST API。似乎我面临的问题(并行作业执行和收集有关步骤的信息,根据请求通信状态)在两个世界中都是相同的(由 FileSystemWatcher 事件或 REST API 触发的请求),但我使用 REST 方法来解释我的所需的功能:

我希望能够启动作业(POST)(响应:作业已接受,作业 ID = xyz),检查作业的状态(带有作业 ID 的 GET,包含步骤结果和当前步骤的响应)以及处理是否done 获取作业的结果(带有作业 ID 的 GET)。

至少这种设置看起来很方便,可以满足当前的需求。

任何人都可以通过向我指出处理此类要求的正确工具/方法来帮助我吗?我完全偏离了正确的方向吗?

我希望这个解释也能被我以外的其他人理解。

谢谢和最好的问候 cil

4

1 回答 1

0

如果我要解决这样的一组要求,那么我要查看的工具是:

使用 .NET 核心工作者服务,有一个 C# 模板dotnet new worker -lang c# -o CSharpService 创建一个 C# 长时间运行的程序。可以在 F# 中创建相同的长时间运行的服务。

创建项目如下:

dotnet new console -lang F# -o FsharpService
cd FsharpService
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Microsoft.Extensions.Hosting.WindowsServices
dotnet add package System.Net.NameResolution

然后将 Program.fs 替换为:

open System
open System.Threading.Tasks
open Microsoft.Extensions.DependencyInjection
open Microsoft.Extensions.Hosting
open Microsoft.Extensions.Logging

type Worker(logger : ILogger<Worker>) =
    inherit BackgroundService()
    let _logger = logger
    override bs.ExecuteAsync stoppingToken =
        let f : Async<unit> = async {
            while not stoppingToken.IsCancellationRequested do
                _logger.LogInformation("Worker running at: {time}", DateTime.Now)
                do! Async.Sleep(1000)
        }
        Async.StartAsTask f :> Task

let CreateHostBuilder argv : IHostBuilder =
    let builder = Host.CreateDefaultBuilder(argv)
    builder.UseWindowsService()
        .ConfigureServices(fun hostContext services -> services.AddHostedService<Worker>() 
                                                        |> ignore<IServiceCollection>)
[<EntryPoint>]
let main argv =
    let hostBuilder = CreateHostBuilder argv
    hostBuilder.Build().Run()
    0 // return an integer exit code

最后,如果您在 Windows 上构建注册并启动服务:

dotnet publish -r win-x64 -c Release /p:PublishSingleFile=true /p:Trimmed=true -o "./published"
sc create FsharpService binPath= "%cd%\published\FsharpService.exe"
services.msc

我的博客上有更多详细信息。

我要看的另一种技术是Task Parallel Library。这允许您构建一个工作流,其中部分或全部可以是并行的,但需要注意块之间的并发和消息传递。从 F# 和模型中调用很简单,其中每个模块都有一个输入类型和(在某些情况下)一个输出类型适合 F#设计类型方法。

这是我在第一次查看 TPL 和 F# 时放在一起的一个简单示例。注意:我没有机会运行它并确认它仍然有效,如果您尝试使用它,您还需要修改 #r 命令以在您的机器上工作。

#r @"System.Threading.Tasks.Dataflow.dll"

open System
open System.IO
open System.Threading.Tasks.Dataflow

let buildPropagateLinkOption () =
    let mutable linkOption = new DataflowLinkOptions()
    linkOption.PropagateCompletion <- true
    linkOption

let buildParallelExecutionOption noThreads =
    let mutable executionOption = new ExecutionDataflowBlockOptions()
    executionOption.MaxDegreeOfParallelism <- noThreads
    executionOption

type TPLRequest = {
    path:string ;
    filter:string ;
}

type TPLFile = {
    fileName : string ;
}

type TPLResponse = {
    fileName : string ;
    size : int64 ;
}

let b1Impl (inReq:TPLRequest) : TPLFile seq = 
    printfn "Directory %s %A" inReq.path System.Threading.Thread.CurrentThread.ManagedThreadId
    Directory.EnumerateFiles(inReq.path, inReq.filter) |> Seq.map(fun x -> {fileName = x})

let b2Impl (inReq:TPLFile) : TPLResponse =
    let fInfo = FileInfo(inReq.fileName)
    printfn "File %s %A" inReq.fileName System.Threading.Thread.CurrentThread.ManagedThreadId
    {fileName = inReq.fileName; size = fInfo.Length }

let b3Impl (inReq:TPLResponse) =
    printfn "%s %d %A" inReq.fileName inReq.size System.Threading.Thread.CurrentThread.ManagedThreadId

let buildFlow () =
    let parallelExecutionOption = buildParallelExecutionOption 4
    let b1 = new TransformManyBlock<TPLRequest,TPLFile>((fun x -> b1Impl x),parallelExecutionOption)
    let b2 = new TransformBlock<TPLFile,TPLResponse>((fun x -> b2Impl x),parallelExecutionOption)
    let b3 = new ActionBlock<TPLResponse>((fun x ->b3Impl x),parallelExecutionOption)
    let propagateLinkOption = buildPropagateLinkOption ()
    b1.LinkTo(b2,propagateLinkOption) |> ignore<IDisposable>
    b2.LinkTo(b3,propagateLinkOption) |> ignore<IDisposable>
    b1

let runFlow () =
    let flow = buildFlow ()
    flow.Post {path="C:\\temp"; filter = "*.txt"} |> ignore<bool>
    flow.Post {path="C:\\temp"; filter = "*.zip"} |> ignore<bool>
    flow.Complete()
    flow.Completion.Wait()
    ()

runFlow ()
于 2020-03-27T10:18:33.097 回答