0

根据 Akka.Net 文档,不推荐使用 PersistentView,而应使用 PersistenceQuery。在 ASP.Net Core 2.0 Web-API 应用程序中,我使用 Akka.Net 和事件源。我正在使用带有事件和快照的 SQL Server 插件来实现持久性。对于持久视图,我想开始使用 PersistenceQuery。当应用程序启动时,会回放事件以恢复参与者的状态。

我已经实现了一个日志阅读器,它正在接收事件,并使用它来组成一个视图。问题是,我怎样才能知道最后播放的事件已经到达,以便可以保存组合视图(作为一种快照)?我不想在恢复阶段的每个事件之后保存视图。

现在,当 ActorSystem 初始化(通过 Startup.cs 调用)时,日志阅读器就会启动。代码如下所示:

private static void InitialiseJournalReader()
{
    // Obtain read journal by plugin id.
    var readJournal = PersistenceQuery.Get(ActorSystem).ReadJournalFor<SqlReadJournal>("akka.persistence.query.myjournal");

    // Materialize stream, consuming events.
    var materializer = ActorMaterializer.Create(ActorSystem);

    var writer = ActorSystem.ActorOf(CreateViewsActor.GetActorProps(), CreateViewsActor.GetActorName());

    // issue query to journal
    Source<EventEnvelope, NotUsed> source = readJournal.CurrentEventsByTag("MyEvents");
    source.RunForeach(envelope => writer.Ask(envelope.Event), materializer);
}

CreateViewsActor是一个 Actor,它使用消息来创建一个或多个视图。它还必须保存这些视图(当前以 JSON 格式保存到 SQL Server 表中)。

不幸的是,到目前为止,我还没有找到通过期刊阅读器创建持久视图的工作示例。但也许我一直在寻找错误的地方。到目前为止,我有以下问题:

  1. 有没有通过期刊阅读器创建持久视图的工作示例?
  2. CreateViewsActor(或任何负责创建和保存视图的代码)如何知道所有恢复消息都已处理?
  3. 初始化期刊阅读器的最佳位置是什么?
4

1 回答 1

0

阅读期刊可用于多种用途。在大多数情况下,用于从事件中生成专门的读取视图。然而,这并不一定意味着演员 - 您可以轻松地将视图转换为数据库表中的更新以获得物化视图。

如果您想将 Actor 与流结合起来,您可以使用Sink.ActorRefSink.ActorRefWithAck,具体取决于您是想在 Actor 中包含背压还是在全推力模式下工作。例子:

using (var materializer = system.Materializer())
{
    var readJournal = PersistenceQuery.Get(system)
        .ReadJournalFor<SqlReadJournal>("akka.persistence.query.my-read-journal");

    var writer = asysem.ActorOf(CreateViewsActor.Props(), CreateViewsActor.GetActorName());

    readJournal
        .CurrentEventsByTag("MyEvents")
        .Collect(envelope => envelope.Event as MyEvent)
        .RunWith(Sink.ActorRefWithAck<MyEvent>(writer, 
            onInitMessage: CreateViewsActor.Init.Instance,
            ackMessage: CreateViewsActor.Ack.Instance,
            onCompleteMessage: CreateViewsActor.Done.Instance), materializer);
}

这里从 actor发送一个initack消息以通知流,何时开始发射或只是将下一个元素发射到 actor(如果可用)。一旦流完成,最后一个接收器参数(在完整消息上)将被发送到 actor 。

如有疑问,您可以随时查看官方 Akka.NET 测试(参见:12)。

关于期刊阅读器初始化 - 它是一个对象,它的生命周期与参与者系统绑定,因此它可以在与参与者系统相同的位置进行初始化和可用。

于 2018-03-09T07:35:48.547 回答