我们如何将 pojo 变量作为 json 生成到 kinesis flink 流中:
val inputStream: DataStream[Array[Byte]] = env.addSource {
loadConsumerOrFail(config, jobName)
}
inputStream.print()
val transformedStream: DataStream[String] = inputStream.map { jsonstr =>
val sJson = JsonMethods.parse((jsonstr.map(_.toChar)).mkString)
val payloadJsonValue = sJson \ "line"
implicit val formats = DefaultFormats
val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"", "")
val payloadBytes = base64Decoder.decode(payvalue)
val collectorPayload = new CollectorPayload
thriftDeserializer.deserialize(collectorPayload, payloadBytes)
badStream(collectorPayload.ipAddress,
collectorPayload.userAgent,
collectorPayload.timestamp,
collectorPayload.refererUri,
collectorPayload.hostname,
(sJson \ "failure_tstamp").extract[String],
collectorPayload.body,
collectorPayload.toString)
}
transformedStream.addSink(loadProducerOrFail(config, jobName))
这里transformedStream被下沉到另一个kinesis但是作为json但是如何转换为json