0

我正在使用 spark 结构化流来使用来自 kafka 主题的数据并将数据写入另一个 kafka 接收器。

我想存储偏移量两次 - 从主题中读取一次并搅拌偏移量。其次,当将数据写入输出接收器并写入偏移量时,这可以通过提供检查点目录位置来实现,

是否可以写订阅主题时消耗的偏移量。

4

1 回答 1

0

您可以使用StreamingQueryListener。您可以通过以下方式将侦听器添加到您的流中

spark.streams.addListener(new StreamingQueryListener() {

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { 

    // insert code here to log the offsets in addition to Spark's checkpoint

  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {}

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
})
于 2020-03-31T11:14:01.493 回答