是否可以在 Akka.Net 参与者计算中等待(不阻塞) Async<'t> ?我想实现类似于以下的东西。
actor {
let! msg = mailbox.Receive()
match msg with
| Foo ->
let! x = async.Return "testing 123" // Some async function, return just an example
() // Do something with result
}
不,您不能在演员的邮箱中使用async
/await
或其任何变体并获得安全的结果。
每个参与者都维护自己的上下文,其中包括重要的细节,如前一条消息的发送者和其他可能改变的重要状态。Actor 串行处理消息,因此一旦其邮箱中的调用完成,它就会立即开始处理下一条消息 - 如果您在邮箱中放置 await 调用,则 Actor 将处理与您开始处理的消息完全不同的消息您的等待电话返回的时间。
利用异步调用和参与者内部的 TAP 的更好模式是使用 PipeTo 模式。看起来我们在http://akkadotnet.github.io/上还没有任何文档,所以我会给你一个真实世界的代码示例(在 C# 中):
public void Handle(ExplicitReplyOperation<CampaignsForAppRequest> message)
{
Context.IncrementMessagesReceived();
_loaderActor.Ask(message.Data).ContinueWith(r =>
{
var campaigns = (IList<Campaign>)r.Result;
message.Originator.Tell(new CampaignsForAppResponse()
{
AppId = message.Data.AppId,
ActiveCampaigns = campaigns
}, ActorRef.NoSender);
return campaigns;
}).PipeTo(Self);
}
在这个示例中,我有一个TypedActor
继续一个任务,进行一些后处理,然后使用PipeTo
操作符(一种可以应用于任何Task
对象的 Akka.NET 扩展方法)在操作完成后将任务结果通过管道传输到该参与者的邮箱中完成了。这样我就可以关闭我需要的任何状态,并且我的参与者可以在此异步操作继续时以安全的方式继续处理消息。
现在看来这是可能的!
let system = ConfigurationFactory.Default() |> System.create "FSharpActors"
let asyncActor =
spawn system "MyActor"
<| fun mailbox ->
let rec loop() =
actor {
let! name = mailbox.Receive()
Akka.Dispatch.ActorTaskScheduler.RunTask(fun () ->
async {
printfn "Hello %s" name
do! Async.Sleep 5000
} |> Async.StartAsTask :> Threading.Tasks.Task)
return! loop()
}
loop()
asyncActor <! "Alice"
asyncActor <! "Bob"
asyncActor <! "Eve"