在回答这个问题时,Odomontois 展示了如何实现一个惰性分组运算符,它可以通过一个键对预先排序的流进行分组,而无需将整个内容存储在内存中。有没有办法用 Akka 的流(即源对象)做这样的事情?或者,有没有办法从 Akka 源中提取一个常规的 Stream 对象,这样我就可以使用 Odomontois 的chopBy?
这是一个完全失败的尝试,它不起作用:
implicit class SourceChopOps[T, NU](s: Source[T, NU]) {
def chopBy[U](f: T => U) = {
s.prefixAndTail(1)
.map(pt => (pt._1.head, pt._2))
.map {
case (prefix, tail) =>
// what to do with pulled off head???
tail.takeWhile(e => f(e) == f(prefix)) ++ tail.dropWhile(e => f(e) == f(prefix)).chopBy(f) // fails here
}
}
}
}