2

I try to write an app using that is using Akka (version 2.4.0) Persistency and Cassandra Plugin (version 0.6, https://github.com/krasserm/akka-persistence-cassandra) to recover from failures. The events are being stored to cassandra with no issues, however, one I try to kill and actor, so the supervisor restarts it, the events are not received by receiveRecover.

It seems that the issue is with the plugin itself, as if I use shared LevelDB instead of cassandra, the events are being received on the recovery step.

Here is the implementation of my persistent actor:



    class SimplePersistentActor extends PersistentActor with ActorLogging {

      def persistenceId: String = context.self.path.name

      override def preRestart(cause: Throwable, msg: Option[Any]) = {
        log.debug(s"Restarting ${getClass.getSimpleName}")
        super.preRestart(cause, msg)
      }

      override def postStop() = {
        log.debug(s"Stopping ${getClass.getSimpleName}")
        super.postStop()
      }

      var transactionData: Either[UninitializedData, RunningTransactionData] = Left(UninitializedData())

      def receiveCommand ={
        case msg @ TransactionStart(transactionId) =>
          persist(msg) { _ => }
          log.debug(s"Starting a transaction with id $transactionId")
          transactionData = Right(RunningTransactionData(transactionId, List()))

          /* Send a reply */
          sender() ! transactionId

        case msg @ TransactionData(data) =>
          persist(msg) { _ => }

          transactionData match {
            case Right(t: RunningTransactionData) =>
              val updatedTransaction = t.copy(data = t.data ::: List(data))
              log.debug(s"There are ${updatedTransaction.data.size} data items within a transaction ${t.transactionId}")
              transactionData = Right(updatedTransaction)

              /* Send a reply */
              sender() ! t.transactionId          

            case _ => log.error("Actor's transaction data is not initialized")
          }

        case TransactionEnd(transactionId) =>
          transactionData match {
            case Right(t: RunningTransactionData) =>
              log.debug(s"Ending a transaction with id ${t.transactionId}")
              transactionData = Left(UninitializedData())

              /* Send a reply */
              sender() ! t.transactionId

            case _ => log.error("Actor's transaction data is not initialized")
          }      

        case other =>
          log.debug(s"Unexpected event received: $other")
      }

      def receiveRecover = {
        case message =>
          log.debug(s"Recovery Step. Message $message received")
      }
    }


In both cases, that I describe above, the code doesn't change. Has anyone seen this issue before?

4

1 回答 1

0

我遇到了同样的事情,并找到了适合我的修复程序。我知道您的问题已经很老了,但是由于我在最新版本的 Akka Cassandra Persistence (0.86) 上看到了同样的问题,所以我认为值得一提。

我遇到的问题来自以下配置。

cassandra-main-journal = ${cassandra-journal} {
  contact-points = ["localhost"]
  keyspace-autocreate = true
  tables-autocreate = true
  keyspace = "main_akka_journal"
}

因此,采用默认cassandra-journal配置并覆盖keyspace. 然后,就像您正在做的那样,persistenceId在 Akka 持久性参与者中覆盖以指向此配置。

如果你这样做会发生什么是对 Actor 的所有写入都进入main_akka_journal键空间。在重新启动 Actor 时,您会收到一条RecoveryCompleted消息,但您看不到任何您编写的消息。但是,当您收到RecoveryCompletedlastSequenceNr是正确的。

有趣的是,如果你有keyspace-autocreate=true,你会看到创建了两个键空间。main_akka_journalakka

所以问题是持久性参与者正在写入main_akka_journal键空间,重新启动时它从akka键空间(为空)读取事件,然后lastSequenceNr从键空间读取事件main_akka_journal(这是正确的)。

我的解决方案是这个配置:

cassandra-main-journal = ${cassandra-journal} {
  contact-points = ["localhost"]
  keyspace-autocreate = true
  tables-autocreate = true
  keyspace = "main_akka_journal"
  query-plugin = "cassandra-main-query-plugin"
}

cassandra-main-query-plugin = ${cassandra-query-journal} {
  write-plugin = "cassandra-main-journal"
}

否则默认情况下write-plugin指向cassandra-journalakka键空间。

于 2018-07-24T16:10:33.463 回答