目前我们在生产环境中运行 Spark 流。我正在将代码转换为使用结构化流。我能够成功地从 Kinesis 读取数据并将(接收器)写入 S3 中的 Parquet 文件。
我们的业务逻辑要求,我们将流数据写入每小时文件夹中。来自 kinesis 的传入数据没有任何日期时间字段。所以不能按日期时间分区。我们定义了一个函数[getSubfolderNameFromDate()]
,它可以获取当前小时+日期(1822062017 - 一天中的第 18 小时,2017 年 6 月 22 日),因此我们可以在每小时文件夹中写入数据。
使用 Spark 流式传输,上下文会重新初始化并自动在下一小时文件夹中写入数据,但我无法通过结构化流式传输实现相同的目标。
例如,200 万条记录在一天中的第 4 个小时进行了流式传输,它应该被写入"S3_location/0422062017.parquet/"
,在接下来的一个小时内流式传输的数据应该在"S3_location/0522062017.parquet/"
等等。
使用结构化流,它可以全天连续写入同一个文件夹,我理解这是因为它只评估一次文件夹名称并连续将数据附加到它。但是我想将新数据附加到每小时文件夹中,有没有办法做到这一点?
我目前正在使用以下查询:
val query = streamedDataDF
.writeStream
.format("parquet")
.option("checkpointLocation", checkpointDir)
.option("path", fileLocation + FileDirectory.getSubfolderNameFromDate() + ".parquet/")
.outputMode("append")
.start()