我正在尝试使用 Spark Structured Streaming 处理来自 Kafka 的数据。提取数据的代码如下:
val enriched = df.select($"value" cast "string" as "json")
.select(from_json($"json", schema) as "data")
.select("data.*")
ds
是一个 DataFrame,其中包含从 Kafka 消耗的数据。
当我尝试读取 JSON 以进行更快的查询时,问题就来了。来自的功能org.apache.spark.sql.functions
from_json()
要求强制提供模式。如果消息有一些不同的字段怎么办?