这样的东西会满足您的需求吗?这可能不完全是你的逻辑,但基本上它接收一个新事件,坚持它接收到事件的事实,然后使用 id 将事件保存到地图中。然后在某个时候(不确定如何触发事件处理),它会收到处理具有特定 ID 的事件的命令。它坚持它应该处理事件的事实,然后处理事件并将其从地图中删除。这样,地图将在重新启动时恢复,并且您可以通过 Id 访问所有尚未处理的事件。
class PersistentMapActor extends PersistentActor {
private var eventMap: Map[ Int, Event ] = Map[ Int, Event ]( )
override def receiveRecover: Receive = {
case NewEventSaved( payload: Event ) =>
eventMap = eventMap + ( (payload.eventId, payload) )
case EventHandled( eventId ) =>
eventMap = eventMap - eventId
}
override def receiveCommand: Receive = {
case SaveNewEvent( payload ) =>
persist( NewEventSaved( payload ) ) { persistedNewEvent =>
//Add new event to map
eventMap = eventMap + ( (payload.eventId, payload) )
}
case HandleEvent( eventId ) =>
val event = eventMap.get( eventId )
event.foreach { e =>
persist( EventHandled( eventId ) ) { persistedEvent =>
//Do stuff with the event
//Remove the event from the map
eventMap = eventMap - eventId
}
}
}
override def persistenceId: String = "PersistentMapActor"
}
object PersistentMapActor {
case class Event( eventId: Int, someField: String )
case class SaveNewEvent( payload: Event )
case class NewEventSaved( payload: Event )
case class HandleEvent( eventId: Int )
case class EventHandled( eventId: Int )
}