我正在对 spark 结构化流数据帧进行一些转换。我将转换后的数据帧作为镶木地板文件存储在 hdfs 中。现在我希望对 hdfs 的写入应该分批进行,而不是先转换整个数据帧,然后再存储数据帧。
1 回答
这是一个镶木地板水槽示例:
# parquet sink example
targetParquetHDFS = sourceTopicKAFKA
.writeStream
.format("parquet") # can be "orc", "json", "csv", etc.
.outputMode("append") # can only be "append"
.option("path", "path/to/destination/dir")
.partitionBy("col") # if you need to partition
.trigger(processingTime="...") # "mini-batch" frequency when data is outputed to sink
.option("checkpointLocation", "path/to/checkpoint/dir") # write-ahead logs for recovery purposes
.start()
targetParquetHDFS.awaitTermination()
更具体的细节:
卡夫卡集成:https ://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
SS 编程指南:https ://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
添加
好的...我在回复中添加了一些内容以澄清您的问题。
SS 有几种不同的触发类型:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
默认值:一旦前一个触发器完成处理,就会发生下一个触发器
固定间隔:
.trigger(processingTime='10 seconds')
因此 10 秒的触发器将在 00:10、00:20、00:30 触发
one-time:一次处理所有可用数据.trigger(once=True)
连续/固定检查点间隔=> 最好看编程指南文档
因此,在您的 Kafka 示例中,SS 可以通过“默认”或“固定间隔”触发器或对 Kafka 源主题中所有可用数据的“一次性”处理以微批量处理事件时间时间戳上的数据。