10

是否可以将来自 Akka HTTP的未知长度的外部流动态反序列化到域对象中?ByteString


语境

我称一个无限长的HTTP端点输出一个JSON Array不断增长的 a:

[
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    ...
] <- Never sees the daylight
4

3 回答 3

5

我想JsonFraming.objectScanner(Int.MaxValue)应该在这种情况下使用。正如文档所述:

返回一个 Flow,该 Flow 实现了基于“大括号计数”的框架运算符,用于发出有效的 JSON 块。它扫描传入的数据流以查找有效的 JSON 对象,并返回仅包含那些有效块的 ByteString 块。可能希望使用此运算符构建数据的典型示例包括:非常大的数组

所以你可以得到这样的结果:

val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl))

response.onComplete {
  case Success(value) =>
    value.entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(_.utf8String)         // In case you have ByteString
      .map(decode[MyEntity](_))  // Use any Unmarshaller here
      .grouped(20)
      .runWith(Sink.ignore)      // Do whatever you need here 
  case Failure(exception) => log.error(exception, "Api call failed")
}
于 2019-02-05T16:01:55.457 回答
0

我在尝试将 Twitter 流(无限字符串)解析为域对象时遇到了一个非常相似的问题。我使用Json4s解决了它,如下所示:

case class Tweet(username: String, geolocation: Option[Geo])
case class Geo(latitude: Float, longitude: Float)
object Tweet{
    def apply(s: String): Tweet = {
        parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet]
    }
}

然后我只是缓冲流并将其映射到推文:

val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8"))
var line = reader.readLine()
while(line != null){
    store(Tweet.apply(line))
    line = reader.readLine()
}

Json4s 完全支持 Option(或对象内部的自定义对象,例如示例中的 Geo)。因此,您可以像我一样放置一个Option,如果该字段没有进入Json,它将被设置为None。

希望能帮助到你!

于 2016-01-08T18:54:02.513 回答
-1

我认为play-iteratees-extras必须帮助你。这个库允许通过 Enumerator/Iteratee 模式解析 Json,当然,不要等待接收所有数据。

例如,不要构建代表“无限”Json 数组的“无限”字节流。

import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee}

var i = 0
var isFirstWas = false

val max = 10000

val stream = Enumerator("[".getBytes) andThen Enumerator.generateM {
  Future {
    i += 1
    if (i < max) {
      val json = Json.stringify(Json.obj(
        "prop" -> Random.nextBoolean(),
        "prop2" -> Random.nextBoolean(),
        "prop3" -> Random.nextInt(),
        "prop4" -> Random.alphanumeric.take(5).mkString("")
      ))

      val string = if (isFirstWas) {
        "," + json
      } else {
        isFirstWas = true
        json
      }


      Some(Codec.utf_8.encode(string))
    } else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag
    else None

  }
}

好的,这个值包含 10000 个(或更多)对象的 jsArray。让我们定义将包含我们数组中每个对象的数据的案例类。

case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String)

现在编写解析器,它将解析每个项目

import play.extras.iteratees._    
import JsonBodyParser._
import JsonIteratees._
import JsonEnumeratees._

val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json =>
  for {
    prop <- json.\("prop").asOpt[Boolean]
    prop2 <- json.\("prop2").asOpt[Boolean]
    prop3 <- json.\("prop3").asOpt[Int]
    prop4 <- json.\("prop4").asOpt[String]
  } yield Props(prop, prop2, prop3, prop4)
}

请参阅docjsArray和. _ 构建结果生成器:jsValuesjsSimpleObject

val result = stream &> Encoding.decode() ><> parser

Encoding.decode()来自 JsonIteratees 包的字节将解码为CharString. resultvalue 具有类型Enumerator[Option[Item]],您可以将一些 iteratee 应用于此枚举器以开始解析过程。

总的来说,我不知道您如何接收字节(解决方案在很大程度上取决于此),但我认为这显示了您的问题的可能解决方案之一。

于 2015-12-15T15:31:26.883 回答