我正在使用 spark 结构化流从 kafka 主题中读取事件并对其进行处理并写入 parquet。我必须根据我在事件中获得的密钥将输出写入不同的文件夹。我尝试使用结构化流示例总是指向一个特定的文件夹。我需要为每个文件夹启动一个流吗?
df.writeStream.format("parquet").option("path", "path/to/destination/dir").start()
我正在使用 spark 结构化流从 kafka 主题中读取事件并对其进行处理并写入 parquet。我必须根据我在事件中获得的密钥将输出写入不同的文件夹。我尝试使用结构化流示例总是指向一个特定的文件夹。我需要为每个文件夹启动一个流吗?
df.writeStream.format("parquet").option("path", "path/to/destination/dir").start()
您可以使用 foreachBatch
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
有关更多信息,您可以参考spark 文档 foreach 和 foreachBatch
我能够通过创建多个 writeStreams 来实现这一点,每个 writeStreams 都特定于一个表
有关详细信息,请参阅嵌套 json 中的结构化流不同模式