0

我们如何将 pojo 变量作为 json 生成到 kinesis flink 流中:

val inputStream: DataStream[Array[Byte]] = env.addSource {
  loadConsumerOrFail(config, jobName)
}
inputStream.print()
val transformedStream: DataStream[String] = inputStream.map { jsonstr =>
  val sJson = JsonMethods.parse((jsonstr.map(_.toChar)).mkString)
  val payloadJsonValue = sJson \ "line"

  implicit val formats = DefaultFormats
  val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"", "")
  val payloadBytes = base64Decoder.decode(payvalue)

  val collectorPayload  = new CollectorPayload
  thriftDeserializer.deserialize(collectorPayload, payloadBytes)

  badStream(collectorPayload.ipAddress,
    collectorPayload.userAgent,
    collectorPayload.timestamp,
    collectorPayload.refererUri,
    collectorPayload.hostname,
    (sJson \ "failure_tstamp").extract[String],
    collectorPayload.body,
    collectorPayload.toString)

}
transformedStream.addSink(loadProducerOrFail(config, jobName))

这里transformedStream被下沉到另一个kinesis但是作为json但是如何转换为json

4

0 回答 0