3

在持久化actor的receive方法中,我接收到一堆我想要持久化的事件,只有在所有事件都持久化之后,再次更新我的状态。我怎样才能做到这一点?

def receive: Receive = {
  ...
  case NewEvents(events) =>
    persist(events) { singleEvent =>
      // Update state using this single event
    }
    // After every events are persisted, do one more thing
}

请注意,persist() 调用不会阻塞,因此我不能在此之后放置我的代码。


更新:为什么我需要这个

这些新事件来自外部 Web 服务。我的持久角色需要在其状态中存储最后一个事件 id,当它接收到命令时,它将用于后续的 ws 调用。问题是这些命令可能同时出现,所以我需要某种锁定系统:

  • 接收到的 ws 调用命令:隐藏下一个命令,直到这个命令完成(也就是说,总而言之,一个布尔值)
  • 收到来自 ws的响应:存储它们,更新状态并保存最后一个 id,对存储中的所有命令执行另一个单个 ws 调用(我让命令发送者能够在完成后全部响应它们)否则不要再隐藏命令了。
4

1 回答 1

1

我还没有尝试过defer,我最初的解决方案是给自己PersistEventsDone发消息。它之所以有效,是因为该persist方法将存储所有传入的消息,直到所有事件处理程序都被执行。如果进程中有另一个命令,它是在之前还是之后并不重要PersistEventsDone

def receive: Receive = {
  ...
  case PersistEventsDone =>
    ...
  case NewEvents(events) =>
    persist(events) { singleEvent =>
      // Update state using this single event
    }
    self ! PersistEventsDone
}

defer在我的情况下有点奇怪,因为它需要一个我不需要的事件。但它看起来仍然比我的解决方案更自然。

于 2014-12-19T22:06:52.527 回答