1

我正在尝试使用下面的简单示例测试 Akka 2.4 的新功能 - PersistentFSM( http://doc.akka.io/docs/akka/2.4.0/scala/persistence.html#Persistent_FSM )。

此示例生成 5 个随机整数并将其添加到序列中。我想做的是保留数据,并在应用程序的下一次运行中继续附加到现有的数字序列。当前的文档PersistentFSM有点稀缺,并且没有明确的方法来完成这项工作。有任何想法吗?

TestFSM.scala:

import akka.actor.{Actor, ActorSystem, Props}
import akka.persistence.fsm.PersistentFSM
import akka.persistence.fsm.PersistentFSM.FSMState
import scala.reflect._
import scala.util.Random

final case class SetNumber(num: Integer)

sealed trait State extends FSMState
case object Idle extends State {
    override def identifier: String = "Idle"
}
case object Active extends State {
    override def identifier: String = "Active"
}

sealed trait Data {
    def add(number: Integer): Data
}
case object Empty extends Data {
    def add(number: Integer) = Numbers(Vector(number))
}
final case class Numbers(queue: Seq[Integer]) extends Data {
    def add(number: Integer) = Numbers(queue :+ number)
}

sealed trait DomainEvt
case class SetNumberEvt(num: Integer) extends DomainEvt

class Generator extends Actor with PersistentFSM[State, Data, DomainEvt] {

    override def applyEvent(domainEvent: DomainEvt, currentData: Data): Data = {
        domainEvent match {
            case SetNumberEvt(num) => currentData.add(num)
        }
    }

    override def persistenceId: String = "generator"

    override def domainEventClassTag: ClassTag[DomainEvt] = classTag[DomainEvt]

    startWith(Idle, Empty)

    when(Idle) {
        case Event(SetNumber(num), Empty) =>
            goto(Active) applying SetNumberEvt(num)
    }

    when(Active) {
        case Event(SetNumber(num), numbers: Data) =>
            println(numbers)
            stay applying SetNumberEvt(num)
    }

    initialize()

}

object TestFSM extends App {

    val system = ActorSystem()

    val actor = system.actorOf(Props[Generator])

    actor ! SetNumber(Random.nextInt())
    actor ! SetNumber(Random.nextInt())
    actor ! SetNumber(Random.nextInt())
    actor ! SetNumber(Random.nextInt())
    actor ! SetNumber(Random.nextInt())

    Thread.sleep(1000)
    system.terminate()

}

参考资料:

akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.leveldb.dir = "target/journal"
akka.persistence.snapshot-store.local.dir = "target/snapshots"
4

1 回答 1

2

PersistentFSM现在随着 FSM 的进行隐式地保持状态变化和领域事件。

尽管缺少您链接到的文档,但可以从persistAll在适当的时候在后台调用的代码中看到:https ://github.com/akka/akka/blob/v2.4.0/akka-persistence /src/main/scala/akka/persistence/fsm/PersistentFSM.scala#L110

建议您测试实现,看看它是否确实存在。

编辑:

我能够执行您的代码,保持不变,并查看target/journal创建的目录。我使用了以下内容build.sbt

name := "Scratch"

version := "1.0"

scalaVersion := "2.11.7"

sbtVersion := "0.13.7"

mainClass := Some("TestFSM")

fork := true

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.0"

libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.4.0"

libraryDependencies += "org.iq80.leveldb"            % "leveldb"          % "0.7"

libraryDependencies += "org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"

以及以下内容application.conf

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.leveldb.dir = "target/journal"
akka.persistence.snapshot-store.local.dir = "target/snapshots"

我认为application.conf是指定应用程序特定的 conf 而不是reference.conf用于库的适当方法。

于 2015-10-02T13:45:21.010 回答