3

在使用 Spark 结构化流时,我很难理解检查点的工作原理。

我有一个生成一些事件的 spark 进程,我将这些事件登录到 Hive 表中。对于这些事件,我在 kafka 流中收到确认事件。

我创建了一个新的火花过程

  • 将 Hive 日志表中的事件读入 DataFrame
  • 使用 Spark Structured Streaming 将这些事件与确认事件流连接起来
  • 将连接的 DataFrame 写入 HBase 表。

我在 spark-shell 中测试了代码,它工作正常,低于伪代码(我使用的是 Scala)。

val tableA = spark.table("tableA")

val startingOffset = "earliest"

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets)
  .option("otherOptions", otherOptions)

val joinTableAWithStreamOfData = streamOfData.join(tableA, Seq("a"), "inner")

joinTableAWithStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  ).start()
  .awaitTermination()

现在我想安排这段代码定期运行,例如每 15 分钟一次,我正在努力理解如何在这里使用检查点。

在此代码的每次运行中,我只想从流中读取我在上一次运行中尚未读取的事件,并将这些新事件与我的日志表内部连接,以便仅将新数据写入最终 HBase桌子。

我在 HDFS 中创建了一个目录来存储检查点文件。我将该位置提供给用于调用 spark 代码的 spark-submit 命令。

spark-submit --conf spark.sql.streaming.checkpointLocation=path_to_hdfs_checkpoint_directory 
--all_the_other_settings_and_libraries

此时代码每 15 分钟运行一次,没有任何错误,但它基本上没有做任何事情,因为它没有将新事件转储到 HBase 表。检查点目录也是空的,而我假设必须在那里写入一些文件?

是否需要调整 readStream 函数才能从最新的检查点开始读取?

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets) ??
  .option("otherOptions", otherOptions)

我真的很难理解有关此的 spark 文档。

先感谢您!

4

1 回答 1

2

扳机

“现在我想安排这段代码定期运行,例如每 15 分钟一次,我正在努力理解如何在这里使用检查点。

如果您希望每 15 分钟触发一次作业,则可以使用Triggers

您不需要专门“使用”检查点,只需提供可靠的(例如 HDFS)检查点位置,见下文。

检查点

在此代码的每次运行中,我只想从流中读取我在上一次运行中尚未读取的事件 [...]"

在 Spark 结构化流应用程序中从 Kafka 读取数据时,最好将检查点位置直接设置在您的StreamingQuery. Spark 使用此位置创建检查点文件,以跟踪应用程序的状态并记录已从 Kafka 读取的偏移量。

重新启动应用程序时,它将检查这些检查点文件以了解从何处继续从 Kafka 读取,因此它不会跳过或错过任何消息。您不需要手动设置startingOffset。

请务必记住,仅允许对应用程序代码进行特定更改,以便检查点文件可用于安全重启。可以在流式查询更改后恢复语义的结构化流式编程指南中找到一个很好的概述。


总的来说,对于从 Kafka 读取的高效 Spark Structured Streaming 应用程序,我推荐以下结构:

val spark = SparkSession.builder().[...].getOrCreate()

val streamOfData = spark.readStream 
  .format("kafka") 
// option startingOffsets is only relevant for the very first time this application is running. After that, checkpoint files are being used.
  .option("startingOffsets", startingOffsets) 
  .option("otherOptions", otherOptions)
  .load()

// perform any kind of transformations on streaming DataFrames
val processedStreamOfData = streamOfData.[...]


val streamingQuery = processedStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  )
  .option("checkpointLocation", "/path/to/checkpoint/dir/in/hdfs/"
  .trigger(Trigger.ProcessingTime("15 minutes"))
  .start()

streamingQuery.awaitTermination()
于 2020-12-11T07:39:04.290 回答