I try to write an app using that is using Akka (version 2.4.0) Persistency and Cassandra Plugin (version 0.6, https://github.com/krasserm/akka-persistence-cassandra) to recover from failures.
The events are being stored to cassandra with no issues, however, one I try to kill and actor, so the supervisor restarts it, the events are not received by receiveRecover
.
It seems that the issue is with the plugin itself, as if I use shared LevelDB instead of cassandra, the events are being received on the recovery step.
Here is the implementation of my persistent actor:
class SimplePersistentActor extends PersistentActor with ActorLogging {
def persistenceId: String = context.self.path.name
override def preRestart(cause: Throwable, msg: Option[Any]) = {
log.debug(s"Restarting ${getClass.getSimpleName}")
super.preRestart(cause, msg)
}
override def postStop() = {
log.debug(s"Stopping ${getClass.getSimpleName}")
super.postStop()
}
var transactionData: Either[UninitializedData, RunningTransactionData] = Left(UninitializedData())
def receiveCommand ={
case msg @ TransactionStart(transactionId) =>
persist(msg) { _ => }
log.debug(s"Starting a transaction with id $transactionId")
transactionData = Right(RunningTransactionData(transactionId, List()))
/* Send a reply */
sender() ! transactionId
case msg @ TransactionData(data) =>
persist(msg) { _ => }
transactionData match {
case Right(t: RunningTransactionData) =>
val updatedTransaction = t.copy(data = t.data ::: List(data))
log.debug(s"There are ${updatedTransaction.data.size} data items within a transaction ${t.transactionId}")
transactionData = Right(updatedTransaction)
/* Send a reply */
sender() ! t.transactionId
case _ => log.error("Actor's transaction data is not initialized")
}
case TransactionEnd(transactionId) =>
transactionData match {
case Right(t: RunningTransactionData) =>
log.debug(s"Ending a transaction with id ${t.transactionId}")
transactionData = Left(UninitializedData())
/* Send a reply */
sender() ! t.transactionId
case _ => log.error("Actor's transaction data is not initialized")
}
case other =>
log.debug(s"Unexpected event received: $other")
}
def receiveRecover = {
case message =>
log.debug(s"Recovery Step. Message $message received")
}
}
In both cases, that I describe above, the code doesn't change. Has anyone seen this issue before?