在接收到 dstream 后,在使用 kafka 和模式注册表的 spark 流中,如何将 dstream 批处理转换为 spark 中的 Dataframe?
从 confluent 使用 KafkaAvroDecoder 后的 Dstream 类型是 Dstream(String,Object)。当我使用下面的代码时,它会在 avro 列中将架构数据类型(例如 Int 更改为 Long)。
val kafkaStream: DStream[(String, Object)] =
KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](
ssc, kafkaParams, Set(topic)
)
// Load JSON strings into DataFrame
kafkaStream.foreachRDD { rdd =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val topicValueStrings = rdd.map(_._2.toString)
val df = sqlContext.read.json(topicValueStrings)
Object.toSting 和读取为 json 会丢失 int 的架构。除了在数据框列中转换类型之外,还有其他方法吗?