在 scala 中,您将如何编写一个函数,该函数接受一个 Futures 序列,运行它们,不断重试任何失败并返回结果?
例如,签名可能是:
def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]
可配置超时的奖励点,此时函数失败并且被调用者可以处理这种情况。
如果该错误案例处理程序可以接收失败的期货列表,则奖励积分。
谢谢!
在 scala 中,您将如何编写一个函数,该函数接受一个 Futures 序列,运行它们,不断重试任何失败并返回结果?
例如,签名可能是:
def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]
可配置超时的奖励点,此时函数失败并且被调用者可以处理这种情况。
如果该错误案例处理程序可以接收失败的期货列表,则奖励积分。
谢谢!
def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
Future
.unit
.flatMap(_ => expr).map(v => Right(v))
.recoverWith {
case _ if n > 1 => retry(expr, n - 1)
case e => Future.failed(e).recover{case e => Left(e)}
}
}
结合
Future.sequence
转换List[Future[T]]
为Future[List[T]]
. 但是sequence
有快速失败的行为,因此我们不得不提升我们Future[T]
的Future[Either[Throwable, T]]
把这些部分放在一起,我们可以定义
def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
Future.sequence(futures.map(f => retry(f.apply())))
}
并像这样使用它
val futures = List(
() => Future(42),
() => Future(throw new RuntimeException("boom 1")),
() => Future(11),
() => Future(throw new RuntimeException("boom 2"))
)
waitRetryAll(futures)
.andThen { case v => println(v) }
哪个输出
Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))
我们可以collect
我们Left
的 s 或Right
s 并相应地恢复或继续处理,例如
waitRetryAll(futures)
.map(_.collect{ case v if v.isLeft => v })
...
注意我们是如何不得不通过List[() => Future[T]]
而不是List[Future[T]]
为了防止期货急切地开始。
据我记得在标准库中没有Future
超时实用程序。
您将如何中断/取消 JVM 上正在进行的计算?在一般情况下,你不能,你只能Thread
在它开启时中断,wait
但如果它从不开启wait
?用于异步计算(定义取消)的 IO 库将 IO 执行为一系列较小的不可中断任务(每个 map/flatMap 创建一个新步骤),如果它们收到取消/超时,那么它们将继续执行当前任务(因为它们无法停止它) 但他们不会开始下一个。您可以在超时时返回异常,但仍然会执行最后一步,因此如果它是一些副作用(例如 DB 操作),它将在您返回失败后完成。
这是不直观且棘手的,我认为这就是为什么没有将这种行为添加到标准库中的原因。
此外,未来正在进行中,可能会产生副作用。您不能获取类型的值Future[A]
并重新运行它。但是,您可以将未来作为按名称参数传递,以便.recoverWith
您可以重新创建未来。
可悲的是,您可以实现类似“重试直到 LocalDateTime.now - startTime >= ”,因为这是我认为您想要的:
def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
future.recoverWith {
case error: Throwable =>
if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
else retryHelper(future, attemptsLeft - 1, timeoutTime)
}
这可以结合起来Future.sequence
创建一个结果列表:
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}
如果您想跟踪哪个未来失败和哪个成功:
def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
future.map(a => Right(a))).recover {
case error: Throwable => Left(error)
}
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}
如果您不为取消 JVM 上的期货而烦恼,并且如果您有更多类似的情况,我建议您使用库。
如果您想使用为您实现重试的东西,则可以使用cats-retry
如果你想Future
在定义计算时有更好的东西(例如不需要你使用别名参数或空函数的东西)试试Monix或 ZIO(https://zio.dev/)