您可能需要两个参与者:一个(协调者)将向客户端发送有关聊天命令的通知。另一个(节流器) - 将每 2 分钟将数据推送到数据库。您的队列将只是限制器的内部状态:
class Coordinator extends Actor {
def receive = {
case command: Record =>
broadcast(command)
throttler ! command
}
}
class Throttler extends Actor {
import system.dispatcher
val queue = mutable.List[Record] //it may be a cache instead
def schedule = system.scheduler.scheduleOnce(2 minutes, self, Tick) // http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html
def receive = {
case Start => schedule
case command: Record =>
queue ++= command
case Tick =>
schedule
try {
//---open transaction here---
for (r <- queue) push(r)
//---close transaction here---
queue.clear //will not be cleared in case of exception
} catch {...}
}
}
正如@abatyuk 所说,您也可以使用基于 FSM 的实现。
如果您需要减少邮箱的负载 - 您可以尝试一些背压/负载平衡模式,例如Akka Work Pulling。
如果您想保护节点本身(在某些服务器节点发生故障时恢复队列状态) - 您可以使用 Akka Cluster 来复制(手动)队列的状态。在这种情况下,协调者应该是集群单例,并且应该自己发送滴答声给随机参与者(您可以为此使用总线)并作为主管维护他们的成功和失败。请注意,主管状态可能会丢失,因此您还应该通过节点复制它(并每 2 分钟在它们之间合并状态,因此最好SortedSet
用于队列,因为合并将类似于sets.reduce(_ ++ _)
)。
像 Riak 这样的存储已经提供了一种解决集群问题的简单方法,因此您可以将它们用作队列(协调器和节流器都将是“无状态”单例)。在 Riak 的情况下,您可以将其配置为可用+分区(请参阅 CAP 定理),因为在这里合并数据不是问题 - 您的聊天记录是CRDT(无冲突)数据类型。
另一种解决方案是使用 WriteBehind 模式(配置为每 2 分钟启动一次)作为节流器的缓存。
事件溯源也可以保护您的参与者的状态,但是当您需要在恢复后重做所有操作时它更有用(您不需要这个 - 它会将所有内容重新提交到数据库)。您可以使用快照(这与直接使用缓存几乎相同),但如果您关心可用性,最好将它们保存到缓存(通过实现 SnapshotStore)而不是本地 FS。请注意,您可能还必须删除以前保存的快照以减少存储大小。并且您应该同步保存每条记录以避免丢失状态。
PS不要忘记向发件人确认消息(对您的javascript),否则即使将缓存作为队列,您也可能丢失最后的消息(在邮箱中)。
PS/2 数据库对于参与者的状态持久性几乎总是一个糟糕的解决方案,因为它很慢并且可能变得不可用。我也不推荐像 MongoDB 这样的强一致性 NoSQL 解决方案——最终一致性是您的最佳选择。