1

我对startingOffsets结构化流媒体感到困惑。在这里的官方文档中,它说查询类型

  1. 式传输- 这是连续流式传输吗?
  2. 批处理- 这是用于使用 forEachBatch 还是触发器进行查询?(最新的不允许)

我的工作流程也已checkpoints启用。这如何与 一起工作startingOffsets?如果我的工作流程崩溃并且我有startingOffsetsas latest,火花检查 kafka 偏移量或火花检查点偏移量或两者兼而有之?

4

1 回答 1

2

默认情况下,流式处理在 Spark 中意味着“微批量”。根据您设置的触发器,它将检查给定频率的新数据源。你可以使用它

val df = spark
  .readStream
  .format("kafka")
  .[...]

对于 Kafka,还有一个实验性的连续触发器,它允许以相当低的延迟处理数据。请参阅文档中的连续处理部分。

另一方面,批处理就像读取您执行一次的文本文件(例如 csv)一样。您可以使用

val df = spark
  .read
  .format("kafka")
  .[...]

请注意readStream流式处理和read批处理的区别。在批处理模式下,startingOffset只能设置为earliest,即使您使用检查点,它也将始终从最早的偏移量开始,以防计划或计划外重新启动。

结构化流式处理中的检查点需要在writeStream部件中设置,并且需要对每个查询都是唯一的(如果您从同一源运行多个流式查询)。如果您设置了该检查点位置并重新启动应用程序,Spark 将仅查看这些检查点文件。只有当查询第一次开始时,它才会检查该startingOffset选项。

请记住,结构化流永远不会向 Kafka 提交任何偏移量。它只依赖于它的检查点文件。请参阅我关于如何在 Spark 结构化流中手动设置 group.id 和提交 kafka 偏移量的其他答案?.

如果您计划运行您的应用程序,例如,一天一次,因此最好使用readStream启用检查点和触发器writeStream.trigger(Trigger.Once)。关于这种方法的一个很好的解释在 Databricks 博客中给出了关于每天运行一次流式作业以节省 10 倍的成本

于 2021-10-06T17:06:56.863 回答