2

我有一个执着的演员。当它第一次启动(数据库为空)时,我会保留一些初始数据。但是状态并没有像我预期的那样得到更新。它仅在处理第一条消息后才更新。如何让演员在状态更新后开始处理消息?

演员代码

class TestActor extends PersistentActor {
  var numberOfEvents = 0

  def updateState(e: Any): Unit = {
    println("updating")
    numberOfEvents += 1
  }

  override def receiveRecover: Receive = {
    case RecoveryCompleted =>
      if (numberOfEvents == 0) {
        println("persisting")
        persist("foo")(updateState)
      }
  }

  override def receiveCommand: Receive = {
    case _ => {
      println("answering")
      sender ! numberOfEvents
    }
  }
}

测试代码

Await.result(actorRef ? "stats", Duration.Inf) shouldBe 0 // I wan't 1 here
Await.result(actorRef ? "stats", Duration.Inf) shouldBe 1

输出

persisting
answering // why this goes before updating?
updating
answering

完整代码

4

1 回答 1

1

您要重新考虑的一件事是,您通常不会更新 RecoveryCompleted 事件的状态,而是处理您坚持的事件以重构状态。RecoveryCompleted 消息用于处理恢复结束时要执行的操作。这些事件将是从您坚持的日志中重放的事件。如果您使用快照,您还可以选择获得快照事件。

例如:

override def receiveRecover: Receive = {
    case Added(num) =>
        updateState(num) 

    case SnapshotOffer(metadata, snapshot) ⇒
       // Restore your full state from the data in the snapshot

    case RecoveryCompleted =>
        println("Recovery completed") // use logger here
  }

另一方面,receiveCommand 用于处理传入的命令并持久化这些事件,然后在这些事件更新后更新内部状态。

override def receiveCommand: Receive = {
    case Add(num) => {
      println("received event and persisting")

      persist(Added(num){ evt ⇒ 
        // This gets called after the persist succeeds
        updateState(num)
        sender ! numberOfEvents
      }      
    }
  }

def updateState(e: Int): Unit = {
    println("updating")
    numberOfEvents += e
  }

在消息传递方面,我发现将事件命名为命令的过去时很有用,如下所示:

// 事件

案例类已添加(v:Int)

// 命令

案例类 Add(v:Int)

希望这使它更清楚一点。

于 2016-04-16T11:54:41.827 回答