14

我有时会发现自己有一些Stream[X], 和 a function X => Future Y, 我想将它们合并到 aFuture[Stream[Y]]中,但我似乎无法找到一种方法来做到这一点。例如,我有

val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)

val result : Future[Stream[String]] = ???

我试过了

 val result = Future.Traverse(x, toFutureString)

这给出了正确的结果,但似乎在返回 Future 之前消耗了整个流,这或多或少地打败了 purpse

我试过了

val result = x.flatMap(toFutureString)

但这不能编译type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]

val result = x.map(toFutureString)

返回有些奇怪和无用的Stream[Future[String]]

我应该在这里做什么来解决问题?

编辑:我没有被困在 a 上Stream,我同样会对 an 上的相同操作感到满意Iterator,只要它不会在开始处理头部之前阻止评估所有项目

Edit2:我不能 100% 确定 Future.Traverse 构造是否需要在返回 Future[Stream] 之前遍历整个流,但我认为确实如此。如果没有,这本身就是一个很好的答案。

Edit3:我也不需要结果是有序的,无论返回的流或迭代器是什么顺序,我都可以。

4

3 回答 3

9

您在正确的轨道上使用traverse,但不幸的是,在这种情况下,标准库的定义似乎有点破损 - 它不需要在返回之前使用流。

Future.traverse是一个更通用的函数的特定版本,它适用于以“可遍历”类型包装的任何应用函子(例如,有关更多信息,请参见这些 论文或我的答案)。

Scalaz库提供这个更通用的版本,它在这种情况下按预期工作(请注意,我正在获取Futurefrom的应用函子实例scalaz-contrib;它还没有在 Scalaz 的稳定版本中,它仍然与 Scala 2.9 交叉构建.2,没有这个Future):

import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._

import ExecutionContext.Implicits.global

def toFutureString(value: Int) = Future(value.toString)

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString

这会立即在无限流上返回,因此我们确定它不会首先被消耗。


作为脚注:如果您查看源代码Future.traverse您会发现它是根据 实现的foldLeft,这很方便,但在流的情况下不是必需的或不合适的。

于 2013-08-04T15:14:15.837 回答
2

忘记流:

import scala.concurrent.Future
import ExecutionContext.Implicits.global

val x = 1 to 10 toList
def toFutureString(value : Int) = Future {
  println("starting " + value)
  Thread.sleep(1000)
  println("completed " + value)
  value.toString
}

产量(在我的 8 核盒上):

scala> Future.traverse(x)(toFutureString)
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
res12: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@2d9472e2

scala> completed 1
completed 2
starting 9
starting 10
completed 3
completed 4
completed 5
completed 6
completed 7
completed 8
completed 9
completed 10

所以它们中的 8 个立即启动(每个内核一个,尽管可以通过线程池执行程序进行配置),然后随着这些完成更多的启动。Future[List[String]] 立即返回,然后在暂停后开始打印那些“完成的 x”消息。

当你有一个 List[Url's] 和一个 Url => Future[HttpResponseBody] 类型的函数时,可以使用这个示例。您可以使用该函数在该列表上调用 Future.traverse,并并行启动这些 http 请求,返回一个作为结果列表的未来。

是不是和你想要的一样?

于 2013-08-08T20:46:40.407 回答
0

接受的答案不再有效,因为现代版本的 Scalaztraverse()行为不同,并试图在调用时消耗整个流。

至于这个问题,我想说的是,不可能以真正的非阻塞方式实现这一目标。

Future[Stream[Y]]在可用之前无法解决Stream[Y]。并且由于Y是由函数异步生成的,因此X => Future[Y]您无法Y在遍历时不阻塞Stream[Y]。这意味着要么Future[Y]必须在解析之前解决所有问题Future[Stream[Y]](这需要消耗整个流),要么必须允许在遍历时出现块Stream[Y](在其基础期货尚未完成的项目上)。但是,如果我们允许阻塞遍历,那么结果未来的完成的定义是什么?从这个角度来看,它可能与Future.successful(BlockingStream[Y]). 这又在语义上等于原始的Stream[Future[Y]].

换句话说,我认为问题本身存在问题。

于 2018-12-13T12:30:29.280 回答