14

我在 Akka 中有一个演员,它将处理消息以创建某些实体。这些实体上的某些字段是根据创建时数据库中其他实体的状态计算的。

我想避免创建参与者处理速度快于数据库能够持久化实体的竞争条件。这可能会导致数据不一致,如下所示:

  • Actor 创建一个Foo并将其发送给其他 Actor 进行进一步处理和保存
  • 要求演员创建另一个Foo. 由于第一个还没有保存,所以新的是根据数据库的旧内容创建的,从而创建了一个错误的Foo.

现在,这种可能性非常小,因为Foos 的创建将被手动触发。但仍然可以想象,在高负载下双击可能会出现问题。谁知道明天Foo是否会自动创建。

因此,我需要某种方法来告诉演员等待,并且只有在确认Foos 已保存后才能恢复其操作。

有没有办法让演员进入空闲状态,并告诉它在一段时间后恢复操作?

基本上,我想将邮箱用作消息队列,并控制队列的处理速度。

4

2 回答 2

24

不,您不能暂停演员:演员总是尽快从他们的邮箱中提取消息。这只留下传入请求被隐藏起来以供稍后处理的可能性:

class A(db: ActorRef) extends Actor with Stash {
  def receive = {
    case Request =>
      doWork()
      db ! Persist
      context.setReceiveTimeout(5.seconds)
      context.become({
        case Request        => stash()
        case Persisted      => context.unbecome(); unstashAll()
        case ReceiveTimeout => throw new TimeoutException("not persisted")
      }, discardOld = false)
  }
}

请注意,无法保证消息传递(或数据库可能已关闭),因此建议使用超时。

根本问题

这个问题主要出现在参与者模型和域模型之间没有很好对齐的情况下:参与者是一致性单位,但在您的用例中,您的一致图像需要一个最新的外部实体(数据库),以便演员做正确的事。如果不了解用例的更多信息,我无法推荐解决方案,但请尝试在考虑到这一点的情况下重塑您的问题。

于 2013-01-28T17:05:53.933 回答
6

事实证明,这只需要几行代码。这是我提出的解决方案,与 pagoda_5b 的建议一致:

class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
  import QueueingActor._

  def receive = {
    case message =>
      context.become({
        case Resume =>
          unstashAll()
          context.unbecome()
        case _ => stash()
      })
      nextActor ! message
  }
}

object QueueingActor {
  case class Resume()
}
于 2013-01-28T16:25:12.230 回答