我正在尝试使用 scalaz iteratee 包来处理恒定空间中的大型 zip 文件。我需要对 zip 文件中的每个文件执行一个长时间运行的过程。这些过程可以(并且应该)并行运行。
我创建了一个EnumeratorT
将每个膨胀ZipEntry
成一个File
对象。签名看起来像:
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
我想附加一个IterateeT
将对每个文件执行长时间运行的过程。我基本上得到了类似的东西:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
当我尝试运行它时:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
我收到一条java.lang.OutOfMemoryError: Java heap space
消息。这对我来说很有意义,因为它试图建立一个庞大的列表来存储所有这些IO
和Promise
对象。
几个问题:
- 有人对如何避免这种情况有任何想法吗?感觉就像我错误地解决了这个问题,因为我真的只关心
longRunningProcess
它的副作用。 - 这里的
Enumerator
方法是错误的方法吗?
我几乎没有想法,所以任何事情都会有所帮助。
谢谢!
更新#1
这是堆栈跟踪:
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
我目前正在接受 nadavwr 的建议,以确保一切都像我想的那样。我会报告任何更新。
更新#2
使用以下两个答案的想法,我找到了一个不错的解决方案。正如 huynhjl 建议的那样(我使用 nadavwr 的分析堆转储的建议进行了验证),consume
导致每个膨胀ZipEntry
都保存在内存中,这就是进程内存不足的原因。我更改consume
并foldM
更新了长时间运行的过程,只返回 aPromise[IOE[Unit]]
而不是对文件的引用。这样我最后就有了所有 IoExceptions 的集合。这是工作解决方案:
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
此解决方案会在异步上传每个条目时对它们进行膨胀。Promise
最后,我有一个包含任何错误的已完成对象的巨大列表。我仍然不完全相信这是对 Iteratee 的正确使用,但我现在确实有几个可重用、可组合的部分,可以在我们系统的其他部分中使用(这对我们来说是一种非常常见的模式)。
感谢你的帮助!