我想写一个从 Kafka 到 Elasticsearch 的 Spark Streaming Job。在这里,我想在从 Kafka 读取模式时动态检测模式。
你能帮我这样做吗?
我知道,这可以通过下一行在 Spark 批处理中完成。
val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema
但是在通过 Spark Streaming Job 执行相同的操作时,我们无法执行上述操作,因为流式处理只能在 Action 上进行。
请告诉我。