12

在过去的几天里,我一直在尝试找出使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件的最佳方法。

最初我从Future-Based Variant开始,看起来像这样:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
  val request = Get(uri)
  val responseFuture = Http().singleRequest(request)
  responseFuture.flatMap { response =>
    val source = response.entity.dataBytes
    source.runWith(FileIO.toFile(file))
  }
}

没关系,但是一旦我了解了更多关于纯 Akka 流的知识,我想尝试使用基于流的变体来创建一个从Source[HttpRequest]. 起初,这完全难倒了我,直到我偶然发现了flatMapConcat流量转换。这最终变得更加冗长:

def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
  case (responseTry, context) => (responseTry.get, context)
}

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
  case (response, _) => response.entity.dataBytes
}

def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
  val request = Get(uri)
  val source = Source.single((request, ()))
  val requestResponseFlow = Http().superPool[Unit]()
  source.
    via(requestResponseFlow).
    map(responseOrFail).
    flatMapConcat(responseToByteSource).
    runWith(FileIO.toFile(file))
}

然后我想有点棘手并使用Content-Disposition标题。

回到基于未来的变体:

def destinationFile(downloadDir: File, response: HttpResponse): File = {
  val fileName = response.header[ContentDisposition].get.value
  val file = new File(downloadDir, fileName)
  file.createNewFile()
  file
}

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
  val request = Get(uri)
  val responseFuture = Http().singleRequest(request)
  responseFuture.flatMap { response =>
    val file = destinationFile(downloadDir, response)
    val source = response.entity.dataBytes
    source.runWith(FileIO.toFile(file))
  }
}

但是现在我不知道如何使用基于未来的变体来做到这一点。据我所知:

def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
  case (response, _) =>
    val source = responseToByteSource(in)
    val file = destinationFile(downloadDir, response)
    source.map((_, file))
}

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
  val request = Get(uri)
  val source = Source.single((request, ()))
  val requestResponseFlow = Http().superPool[Unit]()
  val sourceWithDest: Source[(ByteString, File), Unit] = source.
    via(requestResponseFlow).
    map(responseOrFail).
    flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
  sourceWithDest.runWith(???)
}

所以现在我有一个Source将为每个元素发出一个或多个(ByteString, File)元素File(我说每个元素,File因为没有理由原件Source必须是一个HttpRequest)。

无论如何要采取这些并将它们路由到动态Sink

我在想类似的东西flatMapConcat,例如:

def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???

这样我就可以完成downloadViaFlow2

def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
  val sink = FileIO.toFile(destination, true)
  Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
  case (_, file) => destToSink(file)
}
4

2 回答 2

7

该解决方案不需要 flatMapConcat。如果您不需要文件写入的任何返回值,那么您可以使用Sink.foreach

def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = {
  val file = destinationFile(downloadDir, httpResponse)
  httpResponse.entity.dataBytes.runWith(FileIO.toFile(file))
}

def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = {
  val request = HttpRequest(uri=uri)
  val source = Source.single((request, ()))
  val requestResponseFlow = Http().superPool[Unit]()

  source.via(requestResponseFlow)
        .map(responseOrFail)
        .map(_._1)
        .runWith(Sink.foreach(writeFile(downloadDir)))
}

请注意,从函数中Sink.foreach创建。因此,没有太多的背压。硬盘驱动器可能会减慢 writeFile 的速度,但流会继续生成 Futures。要控制这一点,您可以使用(或):FutureswriteFileFlow.mapAsyncUnorderedFlow.mapAsync

val parallelism = 10

source.via(requestResponseFlow)
      .map(responseOrFail)
      .map(_._1)
      .mapAsyncUnordered(parallelism)(writeFile(downloadDir))
      .runWith(Sink.ignore)

如果要累积 Long 值以获得总计数,则需要结合 a Sink.fold

source.via(requestResponseFlow)
      .map(responseOrFail)
      .map(_._1)
      .mapAsyncUnordered(parallelism)(writeFile(downloadDir))
      .runWith(Sink.fold(0L)(_ + _))

当请求源枯竭时,折叠将保持运行总和并发出最终值。

于 2016-01-23T20:07:17.090 回答
2

使用ws中注入的play Web Services客户端,记得导入scala.concurrent.duration._:

def downloadFromUrl(url: String)(ws: WSClient): Future[Try[File]] = {
  val file = File.createTempFile("my-prefix", new File("/tmp"))
  file.deleteOnExit()

  val futureResponse: Future[WSResponse] =
    ws.url(url).withMethod("GET").withRequestTimeout(5 minutes).stream()

  futureResponse.flatMap { res =>
    res.status match {
      case 200 =>
        val outputStream = java.nio.file.Files.newOutputStream(file.toPath)

        val sink = Sink.foreach[ByteString] { bytes => outputStream.write(bytes.toArray) }

        res.bodyAsSource.runWith(sink).andThen {
          case result =>
            outputStream.close()
            result.get
        } map (_ => Success(file))
      case other => Future(Failure[File](new Exception("HTTP Failure, response code " + other + " : " + res.statusText)))
    }
  }
}
于 2018-08-10T00:08:01.923 回答