2

我正在对 spark 结构化流数据帧进行一些转换。我将转换后的数据帧作为镶木地板文件存储在 hdfs 中。现在我希望对 hdfs 的写入应该分批进行,而不是先转换整个数据帧,然后再存储数据帧。

4

1 回答 1

2

这是一个镶木地板水槽示例:

# parquet sink example
targetParquetHDFS = sourceTopicKAFKA
    .writeStream
    .format("parquet") # can be "orc", "json", "csv", etc.
    .outputMode("append") # can only be "append"
    .option("path", "path/to/destination/dir")
    .partitionBy("col") # if you need to partition
    .trigger(processingTime="...") # "mini-batch" frequency when data is outputed to sink
    .option("checkpointLocation", "path/to/checkpoint/dir") # write-ahead logs for recovery purposes
    .start()
targetParquetHDFS.awaitTermination()

更具体的细节:

卡夫卡集成:https ://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

SS 编程指南:https ://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

添加

好的...我在回复中添加了一些内容以澄清您的问题。

SS 有几种不同的触发类型

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

默认值:一旦前一个触发器完成处理,就会发生下一个触发器

固定间隔.trigger(processingTime='10 seconds') 因此 10 秒的触发器将在 00:10、00:20、00:30 触发

one-time:一次处理所有可用数据.trigger(once=True)

连续/固定检查点间隔=> 最好看编程指南文档

因此,在您的 Kafka 示例中,SS 可以通过“默认”或“固定间隔”触发器或对 Kafka 源主题中所有可用数据的“一次性”处理以微批量处理事件时间时间戳上的数据。

于 2019-04-26T13:18:58.180 回答