持久性 Actor 一次接收一条消息。在持久化 100 条消息后,它应该将这 100 条消息发送到目标 Actor。我打算在保存快照后删除这些消息。
我在每 50 条消息后进行快照。我在收到 SavingSnapshotSuccess 后删除消息。但删除不会发生。我仍然在附加的日志中看到这些消息。没有收到 DeleteMessagesSuccess/Failure 消息。在删除快照时也没有收到 DeleteSnapshotsSuccess/Failure。
以下是版本:
"com.typesafe.akka"%% "akka-actor" % "2.4.0",
"com.typesafe.akka"%% "akka-cluster" % "2.4.0",
"com.typesafe.akka"%% "akka-contrib" % "2.3.14",
"com.typesafe.akka"%% "akka-slf4j"% "2.4.0",
"com.typesafe.akka"%% "akka-testkit"% "2.4.0" % "test",
"com.typesafe.akka"%% "akka-persistence" % "2.4.0"
代码:
class Consumer extends PersistentActor with ActorLogging{
override def persistenceId = "bulkMessageConsumer-1"
var state = BulkState()
def updateState(event: Event): Unit = {
state = state.updated(event)
}
def numEvents = state.size
val receiveRecover: Receive = {
case evt: Event => updateState(evt)
case SnapshotOffer(_, snapshot: BulkState) => state = snapshot
}
val receiveCommand: Receive = {
case Snapshoot ⇒
saveSnapshot(state)
case sss @ SaveSnapshotSuccess(metadata) ⇒
deleteMessages(toSequenceNr = metadata.sequenceNr - 1)
deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = metadata.sequenceNr - 1, maxTimestamp = metadata.timestamp - 1))
case SaveSnapshotFailure(metadata, reason) ⇒
log.error(reason, "Unable to save snapshot [metadata: {}]", metadata)
case c @ Command(data) =>
persistAsync(Event(c.data)) { event =>
basicChecks
updateState(event)
}
case DeleteSnapshotsSuccess =>
log.info("deleting snapshots")
case DeleteSnapshotsFailure =>
log.info("deleting snapshots failure")
case dms @ DeleteMessagesSuccess =>
log.info("messages deleted succesfully:")
case DeleteMessagesFailure =>
log.info("messages deletion failure")
case Print =>
println(state)
}
def send(): Unit ={
val framedMessage = MyMessage
DestinationActorRef ! framedMessage
remove()
}
def remove(): Unit ={
state = state.removeAll
}
def checkNumEventsReachBulkSize: Boolean ={
numEvents==100
}
def checkSnapshotBatch :Boolean ={
numEvents % 50 == 0 && numEvents >0
}
def basicChecks: Unit ={
if(checkSnapshotBatch){
self ! Snapshoot
}
if(checkNumEventsReachBulkSize){
send()
}
}
}