16

我想知道这是否是一个太宽泛的问题,但最近我让自己遇到了一段代码,我想确定如何从 C# 转换为正确的 F#。旅程从这里开始 (1)(TPL-F# 交互的原始问题),并从这里继续 (2)(我正在考虑将一些示例代码翻译成 F#)。

示例代码太长,无法在此处重现,但有趣的函数是ActivateAsyncRefreshHubsAddHub。特别有趣的地方是

  1. AddHub有一个签名private async Task AddHub(string address)
  2. RefreshHubs在循环中调用AddHub并收集 的列表tasks,然后它在最后等待它await Task.WhenAll(tasks),因此返回值与它的签名匹配private async Task RefreshHubs(object _)
  3. RefreshHubsActivateAsyncas调用await RefreshHubs(null),然后最后有一个await base.ActivateAsync()与函数签名匹配的调用public override async Task ActivateAsync()

问题:

将此类函数签名正确转换为仍保持接口和功能并尊重默认自定义调度程序的 F# 的正确转换是什么?而且我也不太确定这个“F#中的异步/等待”。至于如何“机械地”做到这一点。:)

原因是在链接“here (1)”中似乎存在问题(我尚未验证这一点),因为 F# 异步操作不尊重 (Orleans) 运行时设置的自定义协作调度程序。此外,这里声明 TPL 操作会从调度程序中逃脱并进入任务池,因此禁止使用它们。

我能想到的一种方法是使用 F# 函数,如下所示

//Sorry for the inconvenience of shorterned code, for context see the link "here (1)"...
override this.ActivateAsync() =
    this.RegisterTimer(new Func<obj, Task>(this.FlushQueue), null, TimeSpan.FromMilliseconds(100.0), TimeSpan.FromMilliseconds(100.0)) |> ignore

    if RoleEnvironment.IsAvailable then
        this.RefreshHubs(null) |> Async.awaitPlainTask |> Async.RunSynchronously
    else
        this.AddHub("http://localhost:48777/") |> Async.awaitPlainTask |> Async.RunSynchronously

    //Return value comes from here.
    base.ActivateAsync()

member private this.RefreshHubs(_) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //The return value is Task.
    //In the C# version the AddHub provided tasks are collected and then the
    //on the last line there is return await Task.WhenAll(newHubAdditionTasks) 
    newHubs |> Array.map(fun i -> this.AddHub(i)) |> Task.WhenAll

member private this.AddHub(address) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //In the C# version:
    //...
    //hubs.Add(address, new Tuple<HubConnection, IHubProxy>(hubConnection, hub))
    //} 
    //so this is "void" and could perhaps be Async<void> in F#... 
    //The return value is Task.
    hubConnection.Start() |> Async.awaitTaskVoid |> Async.RunSynchronously
    TaskDone.Done

该功能startAsPlainTaskSacha Barber提供另一个有趣的选择可能在这里

module Async =
    let AwaitTaskVoid : (Task -> Async<unit>) =
        Async.AwaitIAsyncResult >> Async.Ignore

<编辑:我刚刚注意到Task.WhenAll也需要等待。但是正确的方法是什么?呃,该睡觉了(一个糟糕的双关语)......

<编辑 2:在Codeplex 的此处 (1)(TPL-F# 交互的原始问题)提到 F# 使用同步上下文将工作推送到线程,而 TPL 没有。现在,这是一个合理的解释,我觉得(尽管无论自定义调度程序如何,我在正确翻译这些片段方面仍然存在问题)。一些有趣的附加信息可能来自

我想我需要在这种情况下提到Hopac,作为一个有趣的切线,并且还提到我在接下来的 50 多个小时左右是遥不可及的,以防我所有的交叉发布都失控了。

<edit 3 : Danielsvick在评论中给出了使用自定义任务生成器的好建议。Daniel 提供了指向已在FSharpx中定义的链接。

查看源代码,我看到带有参数的接口定义为

type TaskBuilder(?continuationOptions, ?scheduler, ?cancellationToken) =
    let contOptions = defaultArg continuationOptions TaskContinuationOptions.None
    let scheduler = defaultArg scheduler TaskScheduler.Default
    let cancellationToken = defaultArg cancellationToken CancellationToken.None

如果要在奥尔良使用它,看起来TaskScheduler应该按照这里TaskScheduler.Current的文档

Orleans 有它自己的任务调度器,它提供了在 grain 中使用的单线程执行模型。运行任务时使用 Orleans 调度程序而不是 .NET 线程池,这一点很重要。

如果你的grain代码需要创建一个子任务,你应该使用Task.Factory.StartNew:

await Task.Factory.StartNew(() =>{ /* 逻辑 */ });

该技术将使用当前任务调度程序,即 Orleans 调度程序。

您应该避免使用 Task.Run,​​它始终使用 .NET 线程池,因此不会在单线程执行模型中运行。

看起来TaskScheduler.CurrentTaskScheduler.Default之间存在细微差别。也许这需要一个问题,即在哪些示例情况下会有不希望的差异。正如奥尔良文档指出的那样,不要使用Task.Run,而是指导Task.Factory.StartNew,我想知道是否应该按照权威机构的建议定义TaskCreationOptions.DenyAttachChild,如Task.Run 与 Task.Factory.StartNewStephen Toub和StartNew 的Stephen Cleary危险的. 嗯,除非我弄错了,否则看起来会是这样。.Default.DenyAttachChilld

此外,由于自定义调度程序存在问题,我想知道是否可以通过使用任务调度程序(Task.Factory)中解释的自定义Task.RunTaskFactory并控制线程数如何:创建一个限制并发的任务计划程序Task.Factory.CreateNew

嗯,这已经是一个相当长的“思考”了。我想知道我应该如何关闭它?也许如果svickDaniel可以将他们的评论作为答案,我会同时支持并接受svick 的

4

1 回答 1

1

您可以TaskBuilder在 FSharpx 中使用 use 并传入TaskScheduler.Current. 这是我翻译的尝试RefreshHubs。请注意,Task<unit>用于代替Task.

let RefreshHubs _ =
    let task = TaskBuilder(scheduler = TaskScheduler.Current)
    task {
        let addresses = 
            RoleEnvironment.Roles.["GPSTracker.Web"].Instances
            |> Seq.map (fun instance ->
                let endpoint = instance.InstanceEndpoints.["InternalSignalR"]
                sprintf "http://%O" endpoint.IPEndpoint
            )
            |> Seq.toList

        let newHubs = addresses |> List.filter (not << hubs.ContainsKey)
        let deadHubs = hubs.Keys |> Seq.filter (fun x -> 
            not (List.exists ((=) x) addresses))

        // remove dead hubs
        deadHubs |> Seq.iter (hubs.Remove >> ignore)

        // add new hubs
        let! _ = Task.WhenAll [| for hub in newHubs -> AddHub hub |]
        return ()
    }
于 2014-07-22T19:39:02.627 回答