1

我已经有一个Source[T],但我需要将它传递给一个需要Stream[T].

我可以.run获取源并将所有内容具体化到一个列表中,然后.toStream对结果执行一个操作,但这会删除我想要保留的惰性/流方面。

这是完成此任务的唯一方法还是我错过了什么?

编辑

阅读弗拉基米尔的评论后,我相信我以错误的方式处理我的问题。这是我拥有的和想要创建的简单示例:

// Given a start index, returns a list from startIndex to startIndex+10.  Stops at 50.
def getMoreData(startIndex: Int)(implicit ec: ExecutionContext): Future[List[Int]] = {
  println(s"f running with $startIndex")
  val result: List[Int] = (startIndex until Math.min(startIndex + 10, 50)).toList
  Future.successful(result)
}

因此getMoreData,只需模拟一项服务,该服务按页面返回数据给我。我的第一个目标是创建以下功能:

def getStream(startIndex: Int)(implicit ec: ExecutionContext): Stream[Future[List[Int]]]

其中Future[List[Int]]流中的下一个元素取决于前一个元素,从流中前一个 Future 的值中读取最后一个索引。例如 startIndex 为 0:

getStream(0)(0) would return Future[List[0 until 10]]
getStream(0)(1) would return Future[List[10 until 20]]
... etc

一旦我有了那个函数,我就想创建一个第二个函数来进一步将它映射到一个简单的Stream[Int]. 例如:

def getFlattenedStream(stream: Stream[Future[List[Int]]]): Stream[Int]

Streams 开始感觉不适合这项工作,我应该只写一个简单的循环。我喜欢流的想法,因为消费者可以按照他们认为合适的方式映射/修改/使用流,而生产者不需要知道它。

4

1 回答 1

0

Scala Streams 是在 内完成任务的好方法getStream;这是构建您要查找的内容的基本方法:

def getStream(startIndex : Int)
             (implicit ec: ExecutionContext): Stream[Future[List[Int]]] = 
  Stream
    .from(startIndex, 10)
    .map(getMoreData)

事情变得棘手的地方在于您的getFlattenedStream功能。可以消除Future围绕您的值的包装器List[Int],但它需要一个Await.result通常是错误的函数调用。

通常最好对 Future 进行操作并允许异步操作自行发生。如果您分析您的最终要求/目标,通常不需要等待未来。

但是,如果你绝对必须放弃 Future 那么这里是可以完成它的代码:

val awaitDuration = 10 Seconds

def getFlattenedStream(stream: Stream[Future[List[Int]]]): Stream[Int] = 
  stream
    .map(f => Await.result(f, awaitDuration))
    .flatten
于 2017-01-17T12:09:42.710 回答