我正在做 CQRS Akka 演员应用程序的查询端。
查询参与者设置为集群分片,并填充来自一个持久性查询流的事件。
我的问题是:
如果集群分片中的参与者之一重新启动如何恢复它?
- 关闭整个集群分片并回复所有事件?
- 使集群分片中的参与者持久化参与者并仅为查询端保存一组新事件?
如果使用 Persistence Query 填充的 actor 重新启动,我如何取消当前的 PQ 并重新启动它?
我正在做 CQRS Akka 演员应用程序的查询端。
查询参与者设置为集群分片,并填充来自一个持久性查询流的事件。
我的问题是:
如果集群分片中的参与者之一重新启动如何恢复它?
如果使用 Persistence Query 填充的 actor 重新启动,我如何取消当前的 PQ 并重新启动它?
正如所讨论的,我将评估将您的查询端持久保存在数据库中。
如果这不是一个选项,并且您想坚持每个分片的单个持久性查询,请在查询参与者中执行以下操作:
var inRecovery: Boolean = true;
override def preStart( ) = {
//Subscribe to your event live stream now, so you don't miss anything during recovery
// e.g. send Subscription message to your persistence query actor
//Re-Read everything up to now for recovery
readJournal.currentEventsByPersistenceId("persistenceId")
.watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
.map(Replay.apply) // Mark your replay messages
.runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
}
override def receive = {
case Done => // Recovery is finished
inRecovery = false
unstashAll() // unstash all normal messages received during recovery
case Replay( payload ) =>
//handle replayed messages
case events: Event =>
//handle normal events from your persistence query
inRecovery match {
case true => stash() // stash normal messages until recovery is done
case false =>
// recovery is done, start handling normal events
}
}
case class Replay( payload: AnyRef )
所以基本上在actor开始订阅持久性查询actor之前,使用所有过去事件的有限流恢复状态,所有事件都通过后终止。在恢复期间存储所有传入事件,这些事件不是重播事件。然后在恢复完成后,取消存储所有内容并开始处理正常消息。