0

目前我们在生产环境中运行 Spark 流。我正在将代码转换为使用结构化流。我能够成功地从 Kinesis 读取数据并将(接收器)写入 S3 中的 Parquet 文件。

我们的业务逻辑要求,我们将流数据写入每小时文件夹中。来自 kinesis 的传入数据没有任何日期时间字段。所以不能按日期时间分区。我们定义了一个函数[getSubfolderNameFromDate()],它可以获取当前小时+日期(1822062017 - 一天中的第 18 小时,2017 年 6 月 22 日),因此我们可以在每小时文件夹中写入数据。

使用 Spark 流式传输,上下文会重新初始化并自动在下一小时文件夹中写入数据,但我无法通过结构化流式传输实现相同的目标。

例如,200 万条记录在一天中的第 4 个小时进行了流式传输,它应该被写入"S3_location/0422062017.parquet/",在接下来的一个小时内流式传输的数据应该在"S3_location/0522062017.parquet/"等等。

使用结构化流,它可以全天连续写入同一个文件夹,我理解这是因为它只评估一次文件夹名称并连续将数据附加到它。但是我想将新数据附加到每小时文件夹中,有没有办法做到这一点?

我目前正在使用以下查询:

 val query = streamedDataDF  
  .writeStream  
  .format("parquet")  
  .option("checkpointLocation", checkpointDir)  
  .option("path", fileLocation  + FileDirectory.getSubfolderNameFromDate() + ".parquet/")  
  .outputMode("append")   
  .start()  
4

0 回答 0