4

在 scala 中,您将如何编写一个函数,该函数接受一个 Futures 序列,运行它们,不断重试任何失败并返回结果?

例如,签名可能是:

  def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]

可配置超时的奖励点,此时函数失败并且被调用者可以处理这种情况。
如果该错误案例处理程序可以接收失败的期货列表,则奖励积分。

谢谢!

4

2 回答 2

4

基于Retry 一个返回 Future 的函数考虑

def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
  Future
    .unit
    .flatMap(_ => expr).map(v => Right(v))
    .recoverWith {
      case _ if n > 1 => retry(expr, n - 1)
      case e => Future.failed(e).recover{case e => Left(e)}
    }
}

结合

Future.sequence

转换List[Future[T]]Future[List[T]]. 但是sequence有快速失败的行为,因此我们不得不提升我们Future[T]Future[Either[Throwable, T]]

把这些部分放在一起,我们可以定义

def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
  Future.sequence(futures.map(f => retry(f.apply())))
}

并像这样使用它

val futures = List(
  () => Future(42),
  () => Future(throw new RuntimeException("boom 1")),
  () => Future(11),
  () => Future(throw new RuntimeException("boom 2"))
)

waitRetryAll(futures)
  .andThen { case v => println(v) }

哪个输出

Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))

我们可以collect我们Left的 s 或Rights 并相应地恢复或继续处理,例如

waitRetryAll(futures)
  .map(_.collect{ case v if v.isLeft => v })
  ...

注意我们是如何不得不通过List[() => Future[T]]而不是List[Future[T]]为了防止期货急切地开始。

于 2020-01-08T00:14:35.647 回答
3

据我记得在标准库中没有Future超时实用程序。

您将如何中断/取消 JVM 上正在进行的计算?在一般情况下,你不能,你只能Thread在它开启时中断,wait但如果它从不开启wait?用于异步计算(定义取消)的 IO 库将 IO 执行为一系列较小的不可中断任务(每个 map/flatMap 创建一个新步骤),如果它们收到取消/超时,那么它们将继续执行当前任务(因为它们无法停止它) 但他们不会开始下一个。您可以在超时时返回异常,但仍然会执行最后一步,因此如果它是一些副作用(例如 DB 操作),它将在您返回失败后完成。

这是不直观且棘手的,我认为这就是为什么没有将这种行为添加到标准库中的原因。

此外,未来正在进行中,可能会产生副作用。您不能获取类型的值Future[A]并重新运行它。但是,您可以将未来作为按名称参数传递,以便.recoverWith您可以重新创建未来。

可悲的是,您可以实现类似“重试直到 LocalDateTime.now - startTime >= ”,因为这是我认为您想要的:

def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
  future.recoverWith {
    case error: Throwable =>
      if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
      else retryHelper(future, attemptsLeft - 1, timeoutTime)
  }

这可以结合起来Future.sequence创建一个结果列表:

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}

如果您想跟踪哪个未来失败和哪个成功:

def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
  future.map(a => Right(a))).recover {
    case error: Throwable => Left(error)
  }

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}

如果您不为取消 JVM 上的期货而烦恼,并且如果您有更多类似的情况,我建议您使用库。

如果您想使用为您实现重试的东西,则可以使用cats-retry

如果你想Future在定义计算时有更好的东西(例如不需要你使用别名参数或空函数的东西)试试Monix或 ZIO(https://zio.dev/

于 2020-01-08T00:28:50.530 回答