0

在回答这个问题时,Odomontois 展示了如何实现一个惰性分组运算符,它可以通过一个键对预先排序的流进行分组,而无需将整个内容存储在内存中。有没有办法用 Akka 的流(即源对象)做这样的事情?或者,有没有办法从 Akka 源中提取一个常规的 Stream 对象,这样我就可以使用 Odomontois 的chopBy?

这是一个完全失败的尝试,它不起作用:

  implicit class SourceChopOps[T, NU](s: Source[T, NU]) {
    def chopBy[U](f: T => U) = {
      s.prefixAndTail(1)
        .map(pt => (pt._1.head, pt._2))
        .map {
          case (prefix, tail) =>
            // what to do with pulled off head???
            tail.takeWhile(e => f(e) == f(prefix)) ++ tail.dropWhile(e => f(e) == f(prefix)).chopBy(f) // fails here
        }
      }
    }
  }
4

1 回答 1

0

groupBy在 Akka Streams 中,您将分组的密钥保留在内存中,但流区域总是“惰性”,因为它们具有背压,因此它将在有界内存中运行。如果下游不接受新元素,则上游不会产生新元素。

例如:

case class Record(id: Int)
Source.fromIterator(() => 
    Iterator
      .fill(1000)(Iterator(1,2).map { n => println("creating"); Record(n) })
      .flatten)
  .groupBy(maxSubstreams = 2, _.id)
  .map { r => println("Consuming"); r }
  .fold(0)((acc, _) => acc + 1)
  .mergeSubstreams
  .runForeach(println)

将向您展示如何Record在两个子流中的每一个中尽可能快地生成实例,而不是预先将它们全部消耗掉。

于 2016-09-09T15:45:48.850 回答