5

akka-http用来向发送回分块响应的 http 服务发出请求。这就是相关代码的样子:

val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap { response =>
    response.entity.dataBytes.runForeach { chunk =>
        println("-----")
        println(chunk.utf8String)
    }
}

命令行中生成的输出如下所示:

-----
{"data":
-----
"some text"}

-----
{"data":
-----
"this is a longer
-----
text"}

-----
{"data": "txt"}

-----
...

数据的逻辑部分 - 在这种情况下是一个 json 以行尾符号结束\r\n,但问题是,json 并不总是适合单个 http 响应块,如上面示例中清晰可见的那样。

我的问题是 - 我如何将传入的分块数据连接到完整的 json 中,以便生成的容器类型仍然是Source[Out,M1]or Flow[In,Out,M2]?我想遵循akka-stream.

更新:值得一提的是,响应是无止境的,聚合必须实时完成

4

3 回答 3

4

找到了解决方案:

val request: HttpRequest = //build the request
request.flatMap { response =>
    response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
        .filter(_.contains("\r\n"))
        .runForeach { json =>
            println("-----")
            println(json)
        }
}
于 2015-10-22T12:41:12.923 回答
0

akka流文档在这个问题的食谱中有一个条目:“Parsing lines from a ByteString”。他们的解决方案非常冗长,但也可以处理单个块可以包含多行的情况。这似乎更健壮,因为块大小可以更改为大到足以处理多个 json 消息。

于 2015-10-28T19:08:02.173 回答
0
response.entity.dataBytes
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8096))
.mapAsyncUnordered(Runtime.getRuntime.availableProcessors()) { data =>
  if (response.status == OK) {
    val event: Future[Event] = Unmarshal(data).to[Event]
    event.foreach(x => log.debug("Received event: {}.", x))
    event.map(Right(_))
  } else {
    Future.successful(data.utf8String)
      .map(Left(_))
  }
}

唯一的要求是您知道一条记录的最大大小。如果您从小事开始,默认行为是如果记录大于限制则失败。您可以将其设置为截断而不是失败,但是一段 JSON 没有意义。

于 2017-12-04T04:08:44.277 回答