在使用 Spark 结构化流时,我很难理解检查点的工作原理。
我有一个生成一些事件的 spark 进程,我将这些事件登录到 Hive 表中。对于这些事件,我在 kafka 流中收到确认事件。
我创建了一个新的火花过程
- 将 Hive 日志表中的事件读入 DataFrame
- 使用 Spark Structured Streaming 将这些事件与确认事件流连接起来
- 将连接的 DataFrame 写入 HBase 表。
我在 spark-shell 中测试了代码,它工作正常,低于伪代码(我使用的是 Scala)。
val tableA = spark.table("tableA")
val startingOffset = "earliest"
val streamOfData = .readStream
.format("kafka")
.option("startingOffsets", startingOffsets)
.option("otherOptions", otherOptions)
val joinTableAWithStreamOfData = streamOfData.join(tableA, Seq("a"), "inner")
joinTableAWithStreamOfData
.writeStream
.foreach(
writeDataToHBaseTable()
).start()
.awaitTermination()
现在我想安排这段代码定期运行,例如每 15 分钟一次,我正在努力理解如何在这里使用检查点。
在此代码的每次运行中,我只想从流中读取我在上一次运行中尚未读取的事件,并将这些新事件与我的日志表内部连接,以便仅将新数据写入最终 HBase桌子。
我在 HDFS 中创建了一个目录来存储检查点文件。我将该位置提供给用于调用 spark 代码的 spark-submit 命令。
spark-submit --conf spark.sql.streaming.checkpointLocation=path_to_hdfs_checkpoint_directory
--all_the_other_settings_and_libraries
此时代码每 15 分钟运行一次,没有任何错误,但它基本上没有做任何事情,因为它没有将新事件转储到 HBase 表。检查点目录也是空的,而我假设必须在那里写入一些文件?
是否需要调整 readStream 函数才能从最新的检查点开始读取?
val streamOfData = .readStream
.format("kafka")
.option("startingOffsets", startingOffsets) ??
.option("otherOptions", otherOptions)
我真的很难理解有关此的 spark 文档。
先感谢您!