我试图从 [Databricks][1] 重现该示例并将其应用于 Kafka 的新连接器并触发结构化流,但是我无法使用 Spark 中的开箱即用方法正确解析 JSON ...
注意:主题以 JSON 格式写入 Kafka。
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", IP + ":9092")
.option("zookeeper.connect", IP + ":2181")
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load()
下面的代码行不通,相信是因为json列是字符串,与from_json签名的方法不匹配...
val df = ds1.select($"value" cast "string" as "json")
.select(from_json("json") as "data")
.select("data.*")
有小费吗?