5

我试图理解 F# 中的异步工作流,但我发现了一个我真的不明白的部分。

以下代码工作正常:

let asynWorkflow = async{
    let! result = Stream.TryOpenAsync(partition) |> Async.AwaitTask 
    return result
    } 

let stream = Async.RunSynchronously asynWorkflow
             |> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)

我定义了一个异步工作流,其中 TryOpenAsync 返回一个Task<StreamOpenResult>类型。我将其转换为Async<StreamOpenResult>Async.AwaitTask。(支线任务:“等待”任务?它不等待它只是转换它,是吗?我认为它与 Task.Wait 或 await 关键字无关)。我“等待”它let!并返回它。要启动工作流,我使用 RunSynchronously 应该启动工作流并返回结果(绑定它)。根据结果​​,我检查是否找到了流。

但现在我的第一个问题。为什么我必须将 TryOpenAsync 调用包装在另一个异步计算中并让!(“等待”)呢?例如以下代码不起作用:

let asynWorkflow =  Stream.TryOpenAsync(partition) |> Async.AwaitTask  

let stream = Async.RunSynchronously asynWorkflow
             |> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)

我认为 AwaitTask 使它成为Async<T>并且RunSynchronously应该启动它。然后使用结果。我想念什么?

我的第二个问题是为什么有任何“Async.Let!” 功能可用?也许是因为它不起作用或更好,为什么它不能与以下代码一起使用?

let ``let!`` task = async{
    let! result = task |> Async.AwaitTask 
   return result
   } 

let stream = Async.RunSynchronously ( ``let!`` (Stream.TryOpenAsync(partition))  )
         |> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)

我只是将 TryOpenAsync 作为参数插入,但它不起作用。通过说不起作用,我的意思是整个 FSI 都会挂起。所以它与我的异步/“等待”有关。

- - 更新:

FSI 中的工作代码结果:

>

Real: 00:00:00.051, CPU: 00:00:00.031, GC gen0: 0, gen1: 0, gen2: 0
val asynWorkflow : Async<StreamOpenResult>
val stream : Stream

FSI 中代码无效的结果:

>

而且您不能再在 FSI 中执行任何操作

--- 更新 2

我正在使用 Streamstone。这里是 C# 示例:https ://github.com/yevhen/Streamstone/blob/master/Source/Example/Scenarios/S04_Write_to_stream.cs

这里是 Stream.TryOpenAsync:https ://github.com/yevhen/Streamstone/blob/master/Source/Streamstone/Stream.Api.cs#L192

4

3 回答 3

4

Stream如果不知道它们是什么以及它们是如何工作的,我无法告诉你为什么第二个例子不起作用partition

但是,我想借此机会指出,这两个示例并非严格等价

F#async有点像做什么的“食谱”。当您编写async { ... }时,生成的计算只是坐在那里,实际上并没有做任何事情。它更像是声明一个函数而不是发出一个命令。只有当你通过调用类似的东西来“启动”它时,Async.RunSynchronouslyAsync.Start才会真正运行。一个推论是,您可以多次启动同一个异步工作流,并且每次都将是一个新的工作流。与工作方式非常相似IEnumerable

Task另一方面,C#更像是对已经在运行的异步计算的“引用”。只要调用Stream.TryOpenAsync(partition),计算就开始了,在任务实际开始之前是不可能获得Task实例的。您可以await生成Task多次,但每次await都不会导致重新尝试打开流。只有第一个await实际上会等待任务完成,而随后的每个都只会返回相同的记忆结果。

在异步/反应式术语中,F#async就是您所说的“冷”,而 C#Task被称为“热”。

于 2017-11-16T20:08:33.003 回答
3

第二个代码块看起来应该对我有用。Stream如果我为and提供虚拟实现,它会运行它StreamOpenResult

您应该尽可能避免使用Async.RunSynchronously,因为它违背了异步的目的。将所有这些代码放在一个更大的async块中,然后您将可以访问StreamOpenResult

async {
    let! openResult = Stream.TryOpenAsync(partition) |> Async.AwaitTask  
    let stream = if openResult.Found then openResult.Stream else Stream(partition)
    return () // now do something with the stream
    }

您可能需要将 a Async.StartorAsync.RunSynchronously放在程序的最外边缘才能实际运行它,但如果您有async(或将其转换为 a Task)并将其传递给可以调用的其他代码(例如 Web 框架),那就更好了它以非阻塞方式。

于 2017-11-16T13:47:15.900 回答
3

并不是说我想用另一个问题来回答你的问题,而是:你为什么要做这样的代码呢?这可能有助于理解它。为什么不只是:

let asyncWorkflow = async {
    let! result = Stream.TryOpenAsync(partition) |> Async.AwaitTask 
    if result.Found then return openResult.Stream else return Stream(partition) }

创建异步工作流只是为了立即调用RunSynchronously它没有什么意义——它类似于调用 a——.ResultTask只是阻塞当前线程,直到工作流返回。

于 2017-11-16T15:50:18.410 回答