1

我有一个非标准的 kafka 格式消息,所以代码如下所示

 val df:Dataset[String] = spark
  .readStream
  .format("kafka")
  .option("subscribe", topic)
  .options(kafkaParams)
  .load()
  .select($"value".as[Array[Byte]])
  .map { v =>
    val e = MyAvroSchema.decodeEnvelope(v)
    val d = MyAvroSchema.decodeDatum(e)
    d 
  }

此时d是代表csv行的字符串,例如

2018-01-02,user8,campaing1,type6,...

假设我可以创建一个 csvSchema:StructType

如何使用 csvSchema 将其转换为 Dataframe[Row]?一个复杂的问题是架构大小很大(大约 85 列),因此创建案例类或元组并不是一个真正的选择

4

0 回答 0