1

我有一个在应用程序启动时作为另一个actor的子级创建的actor,并且每天从父级接收一条消息,以执行从某个SFTP服务器获取一些文件的操作。

现在,可能有一些小的临时连接异常会导致操作失败。在这种情况下,需要重试。

但是可能会抛出异常并且在重试时不会解决(例如:找不到文件,某些配置不正确等)

因此,在这种情况下,考虑到参与者将在很长一段时间后(每天一次)接收消息,什么可能是适当的重试机制和监督策略。

在这种情况下,发送给actor的消息不是错误的输入——它只是一个触发器。例子:

case object FileFetch

如果我在父母中有这样的监督策略,它将在每个次要/主要异常上重新启动失败的孩子而不重试。

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: Exception                => Restart
}

我想要的是这样的:

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: MinorException           => Retry same message 2, 3 times and then Restart
    case _: Exception                => Restart
}
4

1 回答 1

4

在发生异常时“重试”或重新发送消息是您必须自己实现的。从文档中:

如果在处理消息时抛出异常(即从其邮箱中取出并移交给当前行为),则该消息将丢失。重要的是要了解它不会放回邮箱。因此,如果您想重试处理消息,您需要通过捕获异常并重试您的流程来自己处理它。确保限制重试次数,因为您不希望系统活锁(因此消耗大量 cpu 周期而没有取得进展)。

如果您想在不重新启动孩子FileFetch的情况下将消息重新发送给MinorException孩子,那么您可以在孩子中捕获异常以避免触发监督策略。在 try-catch 块中,您可以向父级发送消息并让父级跟踪重试次数(例如,如果您希望父级制定某种退避策略,则可能在此消息中包含时间戳) . 在孩子身上:

def receive = {
  case FileFetch =>
    try {
      ...
    } catch {
      case m: MinorException =>
        val now = System.nanoTime
        context.parent ! MinorIncident(self, now)
    }
  case ...
} 

在父级中:

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {
    case _: Exception => Restart
  }

var numFetchRetries = 0

def receive = {
  case MinorIncident(fetcherRef, time) =>
    log.error(s"${fetcherRef} threw a MinorException at ${time}")
    if (numFetchRetries < 3) { // possibly use the time in the retry logic; e.g., a backoff
      numFetchRetries = numFetchRetries + 1
      fetcherRef ! FileFetch
    } else {
      numFetchRetries = 0
      context.stop(fetcherRef)
      ... // recreate the child
    }
  case SomeMsgFromChildThatFetchSucceeded =>
    numFetchRetries = 0
  case ...
}

或者,您可以Resume在发生 a时将主管策略设置为子级,而不是在子级中捕获异常MinorException,同时仍然让父级处理消息重试逻辑:

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {
    case m: MinorException =>
      val child = sender()
      val now = System.nanoTime
      self ! MinorIncident(child, now)
      Resume
    case _: Exception => Restart
  }
于 2018-01-25T14:35:35.903 回答