我正在使用 spark 结构化流来使用来自 kafka 主题的数据并将数据写入另一个 kafka 接收器。
我想存储偏移量两次 - 从主题中读取一次并搅拌偏移量。其次,当将数据写入输出接收器并写入偏移量时,这可以通过提供检查点目录位置来实现,
是否可以写订阅主题时消耗的偏移量。
我正在使用 spark 结构化流来使用来自 kafka 主题的数据并将数据写入另一个 kafka 接收器。
我想存储偏移量两次 - 从主题中读取一次并搅拌偏移量。其次,当将数据写入输出接收器并写入偏移量时,这可以通过提供检查点目录位置来实现,
是否可以写订阅主题时消耗的偏移量。
您可以使用StreamingQueryListener。您可以通过以下方式将侦听器添加到您的流中
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
// insert code here to log the offsets in addition to Spark's checkpoint
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
})