5

我正在使用 akka 持久性尝试实现一个服务,其中我的状态可能非常大(假设它不适合 RAM)某些实体列表。假设用户希望所有实体的所有历史记录都可用。我可以在 akka 持久性中做到这一点吗?

现在我的演员状态是这样的。

case class System(var processes: Map[Long, Process] = Map()) {

  def updated(event: Event): System = event match {
    case ProcessDetectedEvent(time, activitySets, id, processType) =>
      val process = Process(activitySets.coordinates, time, activitySets.channels, id, processType, false)
      copy(processes = processes + (id -> process))

    case ProcessMovedEvent(id, activitySets, time) =>
      val process = Process(activitySets.coordinates, time, activitySets.channels, id, processes(id).processType, false)
      copy(processes = processes + (id -> process))

    case ProcessClosedEvent(time, id) =>
      val currentProcess = processes(id)
      val process = Process(currentProcess.coordinates, time, currentProcess.channels, id, currentProcess.processType, true)
      copy(processes = processes + (id -> process))
    case _ => this
  }

}

如您所见,进程映射存储在内存中,因此如果进程数量很大,应用程序可能会耗尽内存。

4

3 回答 3

0

Akka 持久性被有状态的 Actor 用于在 Actor 启动时恢复其内部状态,在 JVM 崩溃后重新启动或由主管重新启动,或在集群中迁移。在这种情况下,从长远来看,应用程序/JVM 可能会因 OutOfMemory 异常而崩溃。当这个actor重新启动时,持久性恢复机制会在map中重新创建所有Process的信息。但是总内存又会很高,应用程序在运行时可能会再次崩溃。因此,在这种情况下,持久性将无助于避免应用程序崩溃,除非您仅保留部分进程列表以减少内存。

所以首先你需要想办法解决这个内存异常。也许您可以尝试以下选项。

  1. 尝试在 OutOfMemory 异常后重新启动 JVM 期间增加 JVM 堆大小。
  2. 在恢复状态时,仅重播选定的消息列表,以便使用的总内存较低但状态不完整。

如果恢复期间要重播的消息列表太大,可以使用快照来减少状态恢复时间。

于 2015-10-12T13:14:09.650 回答
0

也许您想考虑是否有有意义的方法将您的数据集划分为具有合理界限的范围。然后你可以用一个持久的actor来表示每个范围,如果你需要跨越整个商店的信息,你就必须有某种协调器来管理范围和迭代它们。但取决于这开始变得多么复杂,在某些时候,我不得不怀疑你是否会重新发明 map-reduce 或 Spark。

于 2015-10-31T20:45:52.463 回答
-1

我认为您可能正在寻找(至少这是另一种选择)是快照。

在使用事件溯源和事件回复时,通常建议的方法是每隔一段时间使用一次快照。

因此,当您取回事件时,您会取回快照,然后是自该快照以来发生的事件。这意味着您从事件存储中流式传输的对象更少(内存更少),需要处理和应用的东西更少(更快)......但这确实带来了它自己的权衡......我不会在这里讨论。

这又只涵盖了最常见的场景。如果您的事件处理发生变化,那么您可能需要重建您的事件......尽管这会引发一些关于您如何构建系统的相当严肃和有趣的问题。

我没有仔细看,但 akka 可能内置了快照的概念。如果不是这样,那么当你开始走上所有那些以理想方式少有人走但现实世界向你抛出去的道路时,前方的道路上就会有一条学习曲线和大量的试验和错误。

于 2015-10-22T18:28:38.360 回答