3

如何连接并读取 scala 中的连续(分块)http 流?例如,如果我有这个用 python/bottle 编写的简单服务:

from gevent import monkey; monkey.patch_all()

import gevent
from bottle import route, run

@route('/stream')
def stream():
    while True:
        yield 'blah\n'
        gevent.sleep(1)

run(host='0.0.0.0', port=8100, server='gevent')

我打算用它akka-stream来处理数据,我只需要一种方法来检索它。

4

1 回答 1

3

这应该有效。基本上,您对产生分块响应的 uri 执行单个请求。响应实体包含一个 dataBytes 流。在分块响应的情况下,这将是块流。在非分块响应 (HttpEntity.Strict) 的情况下,这将是一个只有一个块的流。

显然,您也可以显式匹配实体以查看它是否为 HttpEntity.Chunked,但通常您还希望保留处理非分块响应的能力。

在现实世界的应用程序中,您不会使用 runForeach 来执行副作用,而是使用 dataBytes 流进行一些处理。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{Uri, HttpRequest}
import akka.stream.ActorMaterializer

object ChunkTestClient extends App {

  implicit val system = ActorSystem("test")
  import system.dispatcher

  implicit val materializer = ActorMaterializer()
  val source = Uri("https://jigsaw.w3.org/HTTP/ChunkedScript")
  val finished = Http().singleRequest(HttpRequest(uri = source)).flatMap { response =>
    response.entity.dataBytes.runForeach { chunk =>
      println(chunk.utf8String)
    }
  }
}
于 2015-10-13T16:56:37.877 回答