22

traverse来自对象的方法Future在第一次失败时停止。我想要这个方法的宽容/宽容版本,在发生错误时继续执行其余的序列。

目前,我们在 utils 中添加了以下方法:

def traverseFilteringErrors[A, B <: AnyRef]
                           (seq: Seq[A])
                           (f: A => Future[B]): Future[Seq[B]] = {
  val sentinelValue = null.asInstanceOf[B]
  val allResults = Future.traverse(seq) { x =>
    f(x) recover { case _ => sentinelValue }
  }
  val successfulResults = allResults map { result =>
    result.filterNot(_ == sentinelValue)
  }
  successfulResults
}

有一个更好的方法吗?

4

1 回答 1

23

真正有用的东西(一般来说)是能够将未来的错误提升为适当的价值。或者换句话说,将 aFuture[T]转换为 a Future[Try[T]](成功的返回值变成 aSuccess[T]而失败的情况变成 a Failure[T])。以下是我们可以如何实现它:

// Can also be done more concisely (but less efficiently) as:
// f.map(Success(_)).recover{ case t: Throwable => Failure( t ) }
// NOTE: you might also want to move this into an enrichment class
def mapValue[T]( f: Future[T] ): Future[Try[T]] = {
  val prom = Promise[Try[T]]()
  f onComplete prom.success
  prom.future
}

现在,如果您执行以下操作:

Future.traverse(seq)( f andThen mapValue )

您将获得一个 succesful Future[Seq[Try[A]]],其最终值包含Success每个成功未来的Failure实例,以及每个失败未来的实例。如果需要,您可以collect在此 seq 上使用来删除Failure实例并仅保留成功的值。

换句话说,您可以按如下方式重写您的辅助方法:

def traverseFilteringErrors[A, B](seq: Seq[A])(f: A => Future[B]): Future[Seq[B]] = {
  Future.traverse( seq )( f andThen mapValue ) map ( _ collect{ case Success( x ) => x } )
}
于 2013-04-03T00:42:07.600 回答