0

如何将 aSource[String, Unit]连接到流媒体演员?

我认为https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33StreamingActor的修改版本会很好用,但我很难连接这些部分。

鉴于source: Source[String, Unit]and ctx: RequestContext,我认为修改后的StreamingActor应该与actorRefFactory.actorOf(fromSource(source, ctx)).

供参考,上面的要点:

import akka.actor._
import akka.util.ByteString
import spray.http.HttpEntity.Empty
import spray.http.MediaTypes._
import spray.http._
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp}

object StreamingActor {

  // helper methods

  def fromString(iterable: Iterable[String], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromStringAndCharset(iterable: Iterable[String], ctx: RequestContext, charset: HttpCharset): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteArray(iterable: Iterable[Array[Byte]], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteString(iterable: Iterable[ByteString], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromHttpData(iterable: Iterable[HttpData], ctx: RequestContext): Props = {
    Props(new StreamingActor(iterable, ctx))
  }

  // initial message sent by StreamingActor to itself
  private case object FirstChunk

  // confirmation that given chunk was sent to client
  private case object ChunkAck

}

class StreamingActor(chunks: Iterable[HttpData], ctx: RequestContext) extends Actor with HttpService with ActorLogging {

  import StreamingActor._

  def actorRefFactory = context

  val chunkIterator: Iterator[HttpData] = chunks.iterator

  self ! FirstChunk

  def receive = {

    // send first chunk to client
    case FirstChunk if chunkIterator.hasNext =>
      val responseStart = HttpResponse(entity = HttpEntity(`text/html`, chunkIterator.next()))
      ctx.responder ! ChunkedResponseStart(responseStart).withAck(ChunkAck)

    // data stream is empty. Respond with Content-Length: 0 and stop
    case FirstChunk =>
      ctx.responder ! HttpResponse(entity = Empty)
      context.stop(self)

    // send next chunk to client  
    case ChunkAck if chunkIterator.hasNext =>
      val nextChunk = MessageChunk(chunkIterator.next())
      ctx.responder ! nextChunk.withAck(ChunkAck)

    // all chunks were sent. stop.  
    case ChunkAck =>
      ctx.responder ! ChunkedMessageEnd
      context.stop(self)

    //   
    case x => unhandled(x)
  }

}
4

1 回答 1

2

我认为您对 a 的使用StreamingActor过度复杂化了您试图解决的潜在问题。此外,问题中的 StreamingActor 将为HttpResponse单个HttpRequest. 这是低效的,因为您可以简单地返回 1 个 HttpReponse,并将 anHttpEntity.Chunked作为数据流源的实体。

通用并发设计

Actors 用于状态,例如维护连接之间的运行计数器。即便如此, a 还是Agent涵盖了很多领域,并具有类型检查的额外好处(与 Actor.receive 不同,它在运行时将死信邮箱变成您唯一的类型检查器)。

并发计算,而不是状态,应该(按顺序)处理:

  1. Futures 作为首要考虑因素:可组合、编译时类型安全检查,是大多数情况下的最佳选择。

  2. akka Streams :可组合的,编译时类型安全检查,非常有用,但由于方便的背压功能会产生很多开销。Steam 也是 HttpResponse 实体的形成方式,如下所示。

流式传输 CSV 文件

您的基本问题是如何使用 Streams 将 csv 文件流式传输到 http 客户端。您可以从创建数据源并将其嵌入到 HttpResponse 开始:

def lines() = scala.io.Source.fromFile("DataFile.csv").getLines()

import akka.util.ByteString
import akka.http.model.HttpEntity

def chunkSource : Source[HttpEntity.ChunkStreamPart, Unit] = 
  akka.stream.scaladsl.Source(lines)
                      .map(ByteString.apply)
                      .map(HttpEntity.ChunkStreamPart.apply)

def httpFileResponse = 
  HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, chunkSource))

然后,您可以为任何请求提供此响应:

val fileRequestHandler = {
  case HttpRequest(GET, Uri.Path("/csvFile"), _, _, _) => httpFileResponse
}   

然后将 fileRequestHandler 嵌入到您的服务器路由逻辑中。

于 2015-11-05T13:38:28.050 回答