我有一个非标准的 kafka 格式消息,所以代码如下所示
val df:Dataset[String] = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.options(kafkaParams)
.load()
.select($"value".as[Array[Byte]])
.map { v =>
val e = MyAvroSchema.decodeEnvelope(v)
val d = MyAvroSchema.decodeDatum(e)
d
}
此时d是代表csv行的字符串,例如
2018-01-02,user8,campaing1,type6,...
假设我可以创建一个 csvSchema:StructType
如何使用 csvSchema 将其转换为 Dataframe[Row]?一个复杂的问题是架构大小很大(大约 85 列),因此创建案例类或元组并不是一个真正的选择