0

我正在使用 spark 结构化流从 kafka 主题中读取事件并对其进行处理并写入 parquet。我必须根据我在事件中获得的密钥将输出写入不同的文件夹。我尝试使用结构化流示例总是指向一个特定的文件夹。我需要为每个文件夹启动一个流吗?

df.writeStream.format("parquet").option("path", "path/to/destination/dir").start()

4

2 回答 2

0

您可以使用 foreachBatch

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

有关更多信息,您可以参考spark 文档 foreach 和 foreachBatch

于 2021-08-26T06:50:38.703 回答
0

我能够通过创建多个 writeStreams 来实现这一点,每个 writeStreams 都特定于一个表

有关详细信息,请参阅嵌套 json 中的结构化流不同模式

于 2018-08-21T19:56:06.667 回答