我有一个流结构,如下所示,我必须对其进行解码并使用 flink 作业将转换后的流处理为 kinesis。
{ "errors": [ { "level": "error", "message": "error: instance value ("category") not found in enum"\n" } ], "failure_tstamp": "2021-02-22T18 :30:06.276Z", "line": "CwBkAAAADjE5Mi4xNDMuNTcuMj"
我试图读取流,但不确定如何在转换后将其转换为流
private def loadProducerOrFail(config: JobConfig, jobName: String):
FlinkKinesisProducer[Array[Byte]] = {
KinesisProducer(config.output, () => new KinesisSerializer()) match {
case Right(c) =>
println("got the producer" + c)
c
case Left(e: Exception) =>
throw new RuntimeException(
s"Job $jobName failed. Unable to connect to kinesis. Original error ${e.getMessage}",
e
)
}
}
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val Config = new Properties()
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
lazy val inputStream: DataStream[Either[Array[Byte], Event]] = env.addSource {
loadConsumerOrFail(config, jobName)
}
inputStream.map{jsonstr =>
val sJson = JsonMethods.parse((jsonstr.map(_.toChar)).mkString)
val payloadJsonValue = sJson \ "line"
implicit val formats = DefaultFormats
val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"", "")
val payloadBytes = base64Decoder.decode(payvalue)
val collectorPayload = new CollectorPayload
thriftDeserializer.deserialize(collectorPayload, payloadBytes)
val p = collectorPayload.toString.replace("\\", "")
val jval = p.substring(17,p.length()-1).replace(" ","").replace("\"","").split(",").asJson
JsonMethods.compact(JsonMethods.render(jval)).addSink(loadProducerOrFail(config, jobName))
}
env.execute("Flink Kinesis")
}