18

我已经阅读了一些关于通过 Iteratee 将文件发送到 S3 的可能性的内容,这似乎允许在我们收到文件时发送 S3 文件块,并避免大文件的 OutOfMemory 例如。

我发现这篇 SO 帖子可能几乎是我需要做的: Play 2.x : Reactive file upload with Iteratees 我真的不明白怎么做,或者它是否真的在 Play 2.0.2 中可用(因为 Sadek Brodi 说 foldM 仅在 Play 2.1 中可用)

对于阅读过一些关于 Iteratees 的博客但还不是 Scala/Play2 专家的人,有人可以用简单的方式解释一下吗?

我什至不知道我是否应该使用多部分正文解析器或类似的东西,但我知道的一件事是我不明白这段代码在做什么:

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1028*1028) &>> Iteratee.consume()

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

顺便说一句,与使用经典的 Java InputStream / OutputStream 相比,内存消耗会有什么不同。我实际上能够以非阻塞方式将 500mb 文件转发到 S3,内存消耗非常低,不使用 Iteratees,使用 Java + AsyncHttpClient + Grizzly(但我想它也适用于 Netty)。

那么使用Iteratee有什么好处呢?

我可以看到的一个区别是,我获得并转发到 S3 的 InputStream 在我的情况下由一个临时文件支持(这是一种 CXF 行为),因此它可能不像 Play Iteratee 那样具有反应性

但是对于 Iteratee,如果 Enumerator 产生连接接收到的字节并通过 Iteratee 将它们转发到 S3,那么如果与 S3 的连接不好并且字节不能非常快速地转发,那么“待处理”字节存储在哪里?

4

1 回答 1

5

简单的解释?我会尽力。:)

您正在用组件构建管道。一旦你构建了管道,它就可以被发送数据。它是一个Iteratee,所以它知道如何迭代数据。

您要上传的文件包含在请求正文中,而 BodyParser 是在 Play 中处理请求正文的。因此,您将迭代管道放入 BodyParser。当发出请求时,您的管道将被发送数据(它将对其进行迭代)。

您的管道 ( rechunkAdapter &>> writeToStore) 将数据分块为 1MB 位,然后将它们发送到 S3。

管道的第一部分 ( rechunkAdapter) 进行分块。它实际上有自己的迷你管道来进行分块(consumeAMB)。一旦迷你管道接收到足够的数据来组成一个块,它就会将它发送到主管道。

管道的第二部分 ( writeToStore) 就像一个循环,在每个块上被调用,让您有机会将每个块发送到 S3。

迭代器的优点?

一旦您知道发生了什么,您就可以通过将组件连接在一起来构建迭代管道。并且类型检查器通常会在您错误地将某些东西连接在一起时告诉您。

例如,我们可以修改上面的管道来修复它很慢的事实。它可能很慢,因为只要一个块准备好上传到 S3,请求上传就会暂停。放慢请求上传速度很重要,这样我们就不会耗尽内存,但我们可以通过添加一个固定大小的缓冲区来更宽容一点。所以只需添加Concurrent.buffer(2)到管道的中间以缓冲最多 2 个块。

Iteratees 为流提供了一种功能方法。这是优点还是缺点,取决于您对函数式编程的感受。:) 与惰性流(另一种功能方法)相比,迭代提供了对资源使用的精确控制。

最后,迭代器允许我们相对简单地(!)进行非常复杂的异步流编程。我们可以在不持有线程的情况下处理 IO,这是可扩展性的巨大胜利。经典的 Java InputStream/OutputStream 示例需要 2 个线程。

于 2013-11-13T22:59:07.017 回答