28

我将从这个问题开始:如何使用 Scala APIIteratee将文件上传到云存储(在我的情况下是 Azure Blob 存储,但我认为它现在不是最重要的)

背景:

我需要将输入分成大约 1 MB 的块,以便将大型媒体文件(300 MB+)存储为 Azure 的BlockBlobs. 不幸的是,我的 Scala 知识仍然很差(我的项目是基于 Java 的,Scala 的唯一用途是上传控制器)。

我尝试使用此代码:为什么调用错误或在 BodyParser 的 Iteratee 中完成请求在 Play Framework 2.0 中挂起?(as a Input Iteratee) - 它工作得很好,但Element我可以使用的每个大小为 8192 字节,因此对于将数百兆字节的文件发送到云来说太小了。

我必须说这对我来说是一种全新的方法,而且很可能我误解了一些东西(不想说我误解了一切;>)

我将不胜感激任何提示或链接,这将有助于我解决该主题。如果有任何类似用法的样本,那将是我了解这个想法的最佳选择。

4

4 回答 4

35

基本上,您首先需要的是将输入重新分块为更大的块,1024 * 1024 字节。

首先让我们有一个Iteratee将消耗最多 1m 字节的字节(可以让最后一个块更小)

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()

使用它,我们可以Enumeratee使用一个名为 grouped 的 API 构造一个(适配器)来重新组合块:

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

这里 grouped 使用 anIteratee来确定在每个块中放入多少。它为此使用了我们的consumeAMB。这意味着结果是将Enumeratee输入重新分块Array[Byte]为 1MB。

现在我们需要编写BodyParser,它将使用该Iteratee.foldM方法发送每个字节块:

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

foldM 传递一个状态,并在其传递的函数中使用它(S,Input[Array[Byte]]) => Future[S]来返回一个新的 Future 状态。foldM 将不会再次调用该函数,直到Future完成并且有可用的输入块。

并且正文解析器将重新分块输入并将其推送到存储中:

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

返回一个 Right 表示您在正文解析结束时返回一个正文(恰好是这里的处理程序)。

于 2012-08-15T03:12:16.250 回答
3

如果您的目标是流式传输到 S3,这里有一个我已经实现并测试过的助手:

def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
                (implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = {
  import scala.collection.JavaConversions._

  val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  val initResponse = s3.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped {
    Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
  }

  val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) =>
    val uploadRequest = new UploadPartRequest()
      .withBucketName(bucket)
      .withKey(key)
      .withPartNumber(etags.length + 1)
      .withUploadId(uploadId)
      .withInputStream(new ByteArrayInputStream(bytes))
      .withPartSize(bytes.length)

    val etag = Future { s3.uploadPart(uploadRequest).getPartETag }
    etag.map(etags :+ _)
  }

  val futETags = enum &> rechunker |>>> uploader

  futETags.map { etags =>
    val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
    s3.completeMultipartUpload(compRequest)
  }.recoverWith { case e: Exception =>
    s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
    Future.failed(e)
  }

}
于 2014-09-19T07:57:12.700 回答
0

对于那些也试图找出这个流问题的解决方案的人,你也可以使用已经在 parse.multipartFormData 中实现的东西,而不是编写一个全新的BodyParser。您可以实现类似下面的内容来覆盖默认处理程序handleFilePartAsTemporaryFile

def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = {
  handleFilePart {
    case FileInfo(partName, filename, contentType) =>

      (rechunkAdapter &>> writeToS3).map {
        _ =>
          val compRequest = new CompleteMultipartUploadRequest(...)
          amazonS3Client.completeMultipartUpload(compRequest)
          ...
      }
  }
}

def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)

我能够完成这项工作,但我仍然不确定整个上传过程是否是流式传输的。我尝试了一些大文件,似乎 S3 上传仅在从客户端发送整个文件时才开始。

我查看了上面的解析器实现,我认为一切都是使用 Iteratee 连接的,因此文件应该被流式传输。如果有人对此有所了解,那将非常有帮助。

于 2014-08-13T22:10:12.360 回答
0

将以下内容添加到您的配置文件中

play.http.parser.maxMemoryBuffer=256K

于 2016-01-11T23:02:17.070 回答