1

在接收到 dstream 后,在使用 kafka 和模式注册表的 spark 流中,如何将 dstream 批处理转换为 spark 中的 Dataframe?

从 confluent 使用 KafkaAvroDecoder 后的 Dstream 类型是 Dstream(String,Object)。当我使用下面的代码时,它会在 avro 列中将架构数据类型(例如 Int 更改为 Long)。

val kafkaStream: DStream[(String, Object)] =
      KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](
    ssc, kafkaParams, Set(topic)
      )

  // Load JSON strings into DataFrame
  kafkaStream.foreachRDD { rdd =>
    // Get the singleton instance of SQLContext
    val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
    import sqlContext.implicits._

val topicValueStrings = rdd.map(_._2.toString)
    val df = sqlContext.read.json(topicValueStrings)

代码参考

Object.toSting 和读取为 json 会丢失 int 的架构。除了在数据框列中转换类型之外,还有其他方法吗?

4

0 回答 0