1

使用Source Streaming与处理请求的常规方式相比有什么优势 ?我的理解是在这两种情况下

  1. TCP 连接将被重用
  2. 客户端和服务器之间将施加背压

我可以看到源流的唯一优点是如果响应非常大并且客户端更喜欢以较小的块使用它。

我的用例是我有一个很长的用户列表(数百万),我需要调用一个对用户执行一些过滤并返回一个子集的服务。

目前,在服务器端我公开了一个批处理 API,在客户端,我只是将用户分成 1000 个块,并使用 Akka HTTP Host API 并行进行 X 批处理调用。

我正在考虑切换到 HTTP 流,但不能完全弄清楚价值是什么

4

1 回答 1

1

你错过了另一个巨大的好处:内存效率。通过使用流式管道、客户端/服务器/客户端,各方可以安全地处理数据,而不会冒着破坏内存分配的风险。这在服务器端特别有用,您总是必须假设客户端可能会做一些恶意的事情......

客户端请求创建

假设您的数百万用户的最终来源是一个文件。您可以从此文件创建流源:

val userFilePath : java.nio.file.Path = ???

val userFileSource = akka.stream.scaladsl.FileIO(userFilePath)

此源可用于创建将用户流式传输到服务的 http 请求:

import akka.http.scaladsl.model.HttpEntity.{Chunked, ChunkStreamPart}
import akka.http.scaladsl.model.{RequestEntity, ContentTypes, HttpRequest}

val httpRequest : HttpRequest = 
  HttpRequest(uri = "http://filterService.io", 
              entity = Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, userFileSource))

此请求现在会将用户流式传输到服务,而不会将整个文件消耗到内存中。 一次只会缓冲大块数据,因此,您可以发送具有无限数量用户的请求,并且您的客户端会很好。

服务器请求处理

同样,您的服务器可以设计为接受具有可能无限长的实体的请求。

您的问题说该服务将过滤用户,假设我们有一个过滤功能:

val isValidUser : (String) => Boolean = ???

这可用于过滤传入的请求实体并创建一个响应实体来提供响应:

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.HttpEntity.Chunked

val route = extractDataBytes { userSource =>
  val responseSource : Source[ByteString, _] = 
    userSource
      .map(_.utf8String)
      .filter(isValidUser)
      .map(ByteString.apply)

  complete(HttpResponse(entity=Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, 
                                                responseSource)))
}

客户端响应处理

客户端可以类似地处理过滤后的用户,而无需将它们全部读入内存。例如,我们可以分派请求并将所有有效用户发送到控制台:

import akka.http.scaladsl.Http

Http()
  .singleRequest(httpRequest)
  .map { response =>
    response
      .entity
      .dataBytes
      .map(_.utf8String)
      .foreach(System.out.println)
  }
于 2018-03-21T17:27:15.967 回答