5

我正在尝试使用 scalaz iteratee 包来处理恒定空间中的大型 zip 文件。我需要对 zip 文件中的每个文件执行一个长时间运行的过程。这些过程可以(并且应该)并行运行。

我创建了一个EnumeratorT将每个膨胀ZipEntry成一个File对象。签名看起来像:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]

我想附加一个IterateeT将对每个文件执行长时间运行的过程。我基本上得到了类似的东西:

type IOE[A] = IoExceptionOr[A]

def action(f:File):IO[List[Promise[IOE[File]]]] = (
  consume[Promise[IOE[File]], IO, List] %=
  map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
  Promise { Thread.sleep(5000); iof }

当我尝试运行它时:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get

我收到一条java.lang.OutOfMemoryError: Java heap space消息。这对我来说很有意义,因为它试图建立一个庞大的列表来存储所有这些IOPromise对象。

几个问题:

  • 有人对如何避免这种情况有任何想法吗?感觉就像我错误地解决了这个问题,因为我真的只关心longRunningProcess它的副作用。
  • 这里的Enumerator方法是错误的方法吗?

我几乎没有想法,所以任何事情都会有所帮助。

谢谢!

更新#1

这是堆栈跟踪:

[error] java.lang.OutOfMemoryError: Java heap space
[error]         at scalaz.Free.flatMap(Free.scala:46)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)

我目前正在接受 nadavwr 的建议,以确保一切都像我想的那样。我会报告任何更新。

更新#2

使用以下两个答案的想法,我找到了一个不错的解决方案。正如 huynhjl 建议的那样(我使用 nadavwr 的分析堆转储的建议进行了验证),consume导致每个膨胀ZipEntry都保存在内存中,这就是进程内存不足的原因。我更改consumefoldM更新了长时间运行的过程,只返回 aPromise[IOE[Unit]]而不是对文件的引用。这样我最后就有了所有 IoExceptions 的集合。这是工作解决方案:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
  foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
  map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
  Promise { Thread.sleep(5000); iof.map(println) }

此解决方案会在异步上传每个条目时对它们进行膨胀。Promise最后,我有一个包含任何错误的已完成对象的巨大列表。我仍然不完全相信这是对 Iteratee 的正确使用,但我现在确实有几个可重用、可组合的部分,可以在我们系统的其他部分中使用(这对我们来说是一种非常常见的模式)。

感谢你的帮助!

4

3 回答 3

4

不要使用consume. 请参阅我最近的另一个答案:如何在不溢出堆栈的情况下将 IO 与 Scalaz7 Iteratees 一起使用?

foldM可能是更好的选择。

还尝试将文件映射到其他内容(如成功返回代码),以查看这是否允许 JVM 垃圾收集膨胀的 zip 条目。

于 2013-04-26T14:57:06.630 回答
1

多贵(就内存而言,你的longRunningProcess?文件通缩怎么样?它们执行的次数是否达到了你期望的次数?(一个简单的计数器会很有帮助)

堆栈跟踪将有助于确定压垮骆驼的最后一根稻草——有时这就是罪魁祸首。

如果您想确定是什么占用了这么多内存,您可以使用-XX:+HeapDumpOnOutOfMemoryErrorJVM 参数,然后使用 VisualVM、Eclipse MAT 或其他堆分析器对其进行分析。

跟进

你列举承诺对我来说确实很奇怪。独立于枚举器和迭代器启动计算是违反直觉的。返回“惰性”元素而不是承诺的枚举器可能会更好地为基于迭代的解决方案提供服务。不幸的是,这将使您对单个文件的处理成为串行的,但这对您来说是迭代的——非阻塞流处理。

基于演员的解决方案更适合恕我直言,但演员和迭代者(尤其是后者)对于您要完成的事情(至少是您共享的部分)似乎都过分了。

请考虑 Scala 2.10 的 scala.concurrent 包中的普通期货/承诺,并且一定要看看 Scala 的并行集合。在证明不够充分之前,我不会在代码中引入其他概念。尝试定义一个固定大小的 ExecutionContext 来限制您的并行性。

于 2013-04-26T13:33:14.823 回答
0

我在快速阅读后开始回答,不知何故,我脑海中出现了“堆栈溢出”而不是“内存不足错误”......必须是 URL :-)

尽管如此,依赖递归的函数式计算很容易受到堆栈溢出的影响,所以我已经为任何偶然发现的人留下了答案,并承诺尝试提出一个更相关的答案。

如果您得到的是堆栈溢出,那么您将需要一个“蹦床”,这是一种在递归之间将您的计算提升到堆栈之外的构造。

请参阅Learning Scalaz Day 18中标题为“Stackless Scala with Free Monads”的部分,这是@eed3si9n 优秀系列文章的一部分。

另请参阅@mpilquist 的这个要点,展示了一个蹦床迭代。

于 2013-04-26T12:29:27.703 回答