1

我正在做 CQRS Akka 演员应用程序的查询端。

查询参与者设置为集群分片,并填充来自一个持久性查询流的事件。

我的问题是:

  1. 如果集群分片中的参与者之一重新启动如何恢复它?

    • 关闭整个集群分片并回复所有事件?
    • 使集群分片中的参与者持久化参与者并仅为查询端保存一组新事件?
  2. 如果使用 Persistence Query 填充的 actor 重新启动,我如何取消当前的 PQ 并重新启动它?

4

1 回答 1

2

正如所讨论的,我将评估将您的查询端持久保存在数据库中。

如果这不是一个选项,并且您想坚持每个分片的单个持久性查询,请在查询参与者中执行以下操作:

var inRecovery: Boolean = true;

override def preStart( ) = {
    //Subscribe to your event live stream now, so you don't miss anything during recovery
    // e.g. send Subscription message to your persistence query actor

    //Re-Read everything up to now for recovery
    readJournal.currentEventsByPersistenceId("persistenceId")
        .watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
        .map(Replay.apply) // Mark your replay messages
        .runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
}

override def receive = {
    case Done => // Recovery is finished
        inRecovery = false
        unstashAll() // unstash all normal messages received during recovery

    case Replay( payload ) =>
        //handle replayed messages

    case events: Event =>
        //handle normal events from your persistence query
        inRecovery match {
            case true => stash() // stash normal messages until recovery is done
            case false => 
                // recovery is done, start handling normal events
        }
}


case class Replay( payload: AnyRef )

所以基本上在actor开始订阅持久性查询actor之前,使用所有过去事件的有限流恢复状态,所有事件都通过后终止。在恢复期间存储所有传入事件,这些事件不是重播事件。然后在恢复完成后,取消存储所有内容并开始处理正常消息。

于 2016-07-04T22:05:23.977 回答