0

我想写一个从 Kafka 到 Elasticsearch 的 Spark Streaming Job。在这里,我想在从 Kafka 读取模式时动态检测模式。

你能帮我这样做吗?

我知道,这可以通过下一行在 Spark 批处理中完成。

val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema

但是在通过 Spark Streaming Job 执行相同的操作时,我们无法执行上述操作,因为流式处理只能在 Action 上进行。

请告诉我。

4

1 回答 1

1

如果您正在收听 kafka 主题,则不能依靠 spark 自动推断 json 模式,因为这将花费大量时间。因此,您需要以某种方式将架构提供给您的应用程序。

如果您正在从文件源收听,则可以这样做。

'spark.sql.streaming.schemaInference', 'true'
于 2021-12-15T14:54:02.400 回答