3

我正在尝试使用 Spark Structured Streaming 处理来自 Kafka 的数据。提取数据的代码如下:

val enriched = df.select($"value" cast "string" as "json")
  .select(from_json($"json", schema) as "data")
  .select("data.*")

ds是一个 DataFrame,其中包含从 Kafka 消耗的数据。

当我尝试读取 JSON 以进行更快的查询时,问题就来了。来自的功能org.apache.spark.sql.functions from_json()要求强制提供模式。如果消息有一些不同的字段怎么办?

4

1 回答 1

3

正如@ zero323 和他或她引用的答案所暗示的那样,您提出了一个矛盾的问题:本质上,当一个人不知道架构时,如何强加一个架构?一个当然不能。我认为使用开放式集合类型的想法是您的最佳选择。

最终,几乎可以肯定的是,您可以使用案例类来表示您的数据,即使这意味着使用大量Options、需要解析的字符串和需要查询的映射。努力定义该案例类。否则,您的 Spark 作业本质上将是大量临时的、耗时的忙碌工作。

于 2017-03-31T00:27:43.427 回答