0

我有一个流结构,如下所示,我必须对其进行解码并使用 flink 作业将转换后的流处理为 kinesis。

{ "errors": [ { "level": "error", "message": "error: instance value ("category") not found in enum"\n" } ], "failure_tstamp": "2021-02-22T18 :30:06.276Z", "line": "CwBkAAAADjE5Mi4xNDMuNTcuMj"

我试图读取流,但不确定如何在转换后将其转换为流

private def loadProducerOrFail(config: JobConfig, jobName: String): 
FlinkKinesisProducer[Array[Byte]] = {
  KinesisProducer(config.output, () => new KinesisSerializer()) match {
    case Right(c) =>
      println("got the producer" + c)
      c
    case Left(e: Exception) =>
      throw new RuntimeException(
        s"Job $jobName failed. Unable to connect to kinesis. Original error ${e.getMessage}",
        e
      )
  }
}

def main(args: Array[String]) {
 val env = StreamExecutionEnvironment.getExecutionEnvironment

val Config = new Properties()
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")  
lazy val inputStream: DataStream[Either[Array[Byte], Event]] = env.addSource {
loadConsumerOrFail(config, jobName)
}

inputStream.map{jsonstr =>
  val sJson = JsonMethods.parse((jsonstr.map(_.toChar)).mkString)
  val payloadJsonValue = sJson \ "line"
  implicit val formats = DefaultFormats
  val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"", "")
  val payloadBytes = base64Decoder.decode(payvalue)

  val collectorPayload = new CollectorPayload
  thriftDeserializer.deserialize(collectorPayload, payloadBytes)

  val p = collectorPayload.toString.replace("\\", "")

  val jval = p.substring(17,p.length()-1).replace(" ","").replace("\"","").split(",").asJson

  JsonMethods.compact(JsonMethods.render(jval)).addSink(loadProducerOrFail(config, jobName))
 }
 env.execute("Flink Kinesis")
 }
4

0 回答 0