我一直在使用 Scalaz 7 迭代器来处理恒定堆空间中的大型(即无界)数据流。
在代码中,它看起来像这样:
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Chunk, ErrorOr, List[Result]] =
Iteratee.fold[Chunk, ErrorOr, List[Result]](Nil) { (rs, c) =>
processChunk(c) :: rs
} &= data
现在我想并行执行处理,一次处理P块数据。我仍然必须限制堆空间,但可以合理地假设有足够的堆来存储P块数据和计算的累积结果。
我知道Task
该类并考虑映射枚举器以创建任务流:
data map (c => Task.delay(processChunk(c)))
但我仍然不确定如何管理非确定性。在消费流时,如何确保P任务尽可能运行?
第一次尝试:
我的第一个解决方案是折叠流并创建一个 ScalaFuture
来处理每个块。但是,该程序因 GC 开销错误而崩溃(可能是因为它在尝试创建所有Future
s 时将所有块拉入内存)。相反,当已经有P个任务在运行时,迭代者需要停止使用输入,并在这些任务中的任何一个完成时再次恢复。
第二次尝试:
我的下一个尝试是将流分组为P大小的部分,并行处理每个部分,然后在继续下一部分之前加入:
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Chunk, ErrorOr, Vector[Result]] =
Iteratee.foldM[Vector[Chunk], ErrorOr, Vector[Result]](Nil) { (rs, cs) =>
tryIO(IO(rs ++ Await.result(
Future.traverse(cs) {
c => Future(processChunk(c))
},
Duration.Inf)))
} &= (data mapE Iteratee.group(P))
虽然这不会充分利用可用的处理器(特别是因为处理每个处理器所需的时间Chunk
可能差别很大),但这将是一个改进。但是,枚举group
对象似乎泄漏了内存——堆使用率突然飙升。