0

当我一般使用 Giraffe 或 ASP.Net Core 时,我可以创建一个演员系统,将其添加为服务,然后让它认为请求处理程序选择任何演员并询问/告知消息。

无论是使用 Cluster.Sharding 还是普通的user/actor,我都知道它将是整个系统中处理多条消息的参与者的单个实例。

如何与 Streams 进行相同的通信?它们似乎不是路由器中的引用,也不是作为参与者路径的参与者系统:参与者引用、路径和地址。

这应该以不同的方式进行吗?

从 IO 部分复制,我可以具体化一个图来处理每个请求,但一般来说,我与“单例”(如域驱动设计聚合根)进行通信以处理域逻辑(这就是分片模块的原因),我不确定如何做可用于请求处理程序中新物化图中的单例接收器,因为所有请求必须只有一个接收器。

4

1 回答 1

2

有很多方法可以将 akka 流与外部系统集成。使接收者更容易的是Source.queue(有点类似于 System.Threading.Channels 并早于它们)。您可以在初始化点具体化您的流,然后在 Giraffe DI 中注册队列端点 - 这样您就不必为每个请求支付相同的流初始化成本:

open Akka.Streams
open Akkling
open Akkling.Streams
open FSharp.Control.Tasks.Builders

let run () = task {
    use sys = System.create "sys" <| Configuration.defaultConfig()
    use mat = sys.Materializer()
    
    // construct a stream with async queue on both ends with buffer for 10 elements
    let sender, receiver =
        Source.queue OverflowStrategy.Backpressure 10
        |> Source.map (fun x -> x * x)
        |> Source.toMat (Sink.queue) Keep.both
        |> Graph.run mat
        
    // send data to a queue - quite often result could be just ignored
    match! sender.OfferAsync 2 with
    | :? QueueOfferResult.Enqueued -> () // successfull
    | :? QueueOfferResult.Dropped -> () // doesn't happen in OverflowStrategy.Backpressure 
    | :? QueueOfferResult.QueueClosed -> () // queue has been already closed
    | :? QueueOfferResult.Failure as f -> eprintfn "Unexpected failure: %O" f.Cause
    
    // try to receive data from the queue
    match! receiver.AsyncPull() with
    | Some data -> printfn "Received: %i" data
    | None -> printfn "Stream has been prematurelly closed"
        
    // asynchronously close the queue
    sender.Complete()
    do! sender.WatchCompletionAsync()
}
于 2020-10-06T09:53:15.457 回答