有没有办法将 Seq[Future[X]] 变成Enumerator[X]?用例是我想通过爬网来获取资源。这将返回一个期货序列,我想返回一个枚举器,它将按照它们首先完成的顺序将期货推送到 Iteratee。
看起来 Victor Klang 的Future select gist可以用来做到这一点——尽管它看起来效率很低。
注意:有问题的迭代器和枚举器是播放框架版本 2.x 给出的那些,即具有以下导入:import play.api.libs.iteratee._
有没有办法将 Seq[Future[X]] 变成Enumerator[X]?用例是我想通过爬网来获取资源。这将返回一个期货序列,我想返回一个枚举器,它将按照它们首先完成的顺序将期货推送到 Iteratee。
看起来 Victor Klang 的Future select gist可以用来做到这一点——尽管它看起来效率很低。
注意:有问题的迭代器和枚举器是播放框架版本 2.x 给出的那些,即具有以下导入:import play.api.libs.iteratee._
/**
* "Select" off the first future to be satisfied. Return this as a
* result, with the remainder of the Futures as a sequence.
*
* @param fs a scala.collection.Seq
*/
def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext):
Future[(Try[A], Seq[Future[A]])] = {
@scala.annotation.tailrec
def stripe(p: Promise[(Try[A], Seq[Future[A]])],
heads: Seq[Future[A]],
elem: Future[A],
tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = {
elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) }
if (tail.isEmpty) p.future
else stripe(p, heads :+ elem, tail.head, tail.tail)
}
if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!"))
else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail)
}
}
然后我可以得到我需要的东西
Enumerator.unfoldM(initialSeqOfFutureAs){ seqOfFutureAs =>
if (seqOfFutureAs.isEmpty) {
Future(None)
} else {
FutureUtil.select(seqOfFutureAs).map {
case (t, seqFuture) => t.toOption.map {
a => (seqFuture, a)
}
}
}
}
一个更好、更短且我认为更有效的答案是:
def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] {
def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] = {
Future.sequence(seqFutureX).flatMap { seqX: Seq[X] =>
seqX.foldLeft(Future.successful(i)) {
case (i, x) => i.flatMap(_.feed(Input.El(x)))
}
}
}
}
我确实意识到这个问题已经有点老了,但是根据 Santhosh 的回答和内置的 Enumterator.enumerate() 实现,我想出了以下内容:
def enumerateM[E](traversable: TraversableOnce[Future[E]])(implicit ec: ExecutionContext): Enumerator[E] = {
val it = traversable.toIterator
Enumerator.generateM {
if (it.hasNext) {
val next: Future[E] = it.next()
next map {
e => Some(e)
}
} else {
Future.successful[Option[E]] {
None
}
}
}
}
请注意,与第一个基于 Viktor 选择的解决方案不同,这个解决方案保留了顺序,但您仍然可以在之前异步启动所有计算。因此,例如,您可以执行以下操作:
// For lack of a better name
def mapEachM[E, NE](eventuallyList: Future[List[E]])(f: E => Future[NE])(implicit ec: ExecutionContext): Enumerator[NE] =
Enumerator.flatten(
eventuallyList map { list =>
enumerateM(list map f)
}
)
当我偶然发现这个线程时,后一种方法实际上是我正在寻找的。希望它可以帮助某人!:)
您可以使用 Java 执行器完成服务 ( JavaDoc ) 构建一个。这个想法是使用创建一系列新期货,每个期货ExecutorCompletionService.take()
用于等待下一个结果。当前一个未来有结果时,每个未来都会开始。
但请注意,这可能效率不高,因为很多同步发生在幕后。使用一些并行 map reduce 进行计算(例如使用 Scala 的 ParSeq)并让 Enumerator 等待完整的结果可能会更有效。
警告:在回答之前未编译
像这样的东西怎么样:
def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] {
def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] =
Future.fold(seqFutureX)(i){ case (i, x) => i.flatMap(_.feed(Input.El(x)))) }
}
这是我发现很方便的东西,
def unfold[A,B](xs:Seq[A])(proc:A => Future[B])(implicit errorHandler:Throwable => B):Enumerator[B] = {
Enumerator.unfoldM (xs) { xs =>
if (xs.isEmpty) Future(None)
else proc(xs.head) map (b => Some(xs.tail,b)) recover {
case e => Some((xs.tail,errorHandler(e)))
}
}
}
def unfold[A,B](fxs:Future[Seq[A]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = {
(unfold(Seq(fxs))(fxs => fxs)(errorHandler1)).flatMap(unfold(_)(proc)(errorHandler))
}
def unfoldFutures[A,B](xsfxs:Seq[Future[Seq[A]]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = {
xsfxs.map(unfold(_)(proc)).reduceLeft((a,b) => a.andThen(b))
}
我想建议使用广播
def seqToEnumerator[A](futuresA: Seq[Future[A]])(defaultValue: A, errorHandler: Throwable => A): Enumerator[A] ={
val (enumerator, channel) = Concurrent.broadcast[A]
futuresA.foreach(f => f.onComplete({
case Success(Some(a: A)) => channel.push(a)
case Success(None) => channel.push(defaultValue)
case Failure(exception) => channel.push(errorHandler(exception))
}))
enumerator
}
我添加了 errorHandling 和 defaultValues,但您可以使用 onSuccess 或 onFailure 跳过它们,而不是 onComplete