你错过了另一个巨大的好处:内存效率。通过使用流式管道、客户端/服务器/客户端,各方可以安全地处理数据,而不会冒着破坏内存分配的风险。这在服务器端特别有用,您总是必须假设客户端可能会做一些恶意的事情......
客户端请求创建
假设您的数百万用户的最终来源是一个文件。您可以从此文件创建流源:
val userFilePath : java.nio.file.Path = ???
val userFileSource = akka.stream.scaladsl.FileIO(userFilePath)
此源可用于创建将用户流式传输到服务的 http 请求:
import akka.http.scaladsl.model.HttpEntity.{Chunked, ChunkStreamPart}
import akka.http.scaladsl.model.{RequestEntity, ContentTypes, HttpRequest}
val httpRequest : HttpRequest =
HttpRequest(uri = "http://filterService.io",
entity = Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, userFileSource))
此请求现在会将用户流式传输到服务,而不会将整个文件消耗到内存中。 一次只会缓冲大块数据,因此,您可以发送具有无限数量用户的请求,并且您的客户端会很好。
服务器请求处理
同样,您的服务器可以设计为接受具有可能无限长的实体的请求。
您的问题说该服务将过滤用户,假设我们有一个过滤功能:
val isValidUser : (String) => Boolean = ???
这可用于过滤传入的请求实体并创建一个响应实体来提供响应:
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.HttpEntity.Chunked
val route = extractDataBytes { userSource =>
val responseSource : Source[ByteString, _] =
userSource
.map(_.utf8String)
.filter(isValidUser)
.map(ByteString.apply)
complete(HttpResponse(entity=Chunked.fromData(ContentTypes.`text/plain(UTF-8)`,
responseSource)))
}
客户端响应处理
客户端可以类似地处理过滤后的用户,而无需将它们全部读入内存。例如,我们可以分派请求并将所有有效用户发送到控制台:
import akka.http.scaladsl.Http
Http()
.singleRequest(httpRequest)
.map { response =>
response
.entity
.dataBytes
.map(_.utf8String)
.foreach(System.out.println)
}