1

我正在使用结构化流来读取 csvs 和写入 kafka。Spark UI 中未显示流式处理选项卡(未使用流式处理上下文)。

val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory") 

如何在 UI 中获取流媒体指标?

4

1 回答 1

1

要查看一些指标(在控制台中),您需要添加一个监听器

spark.streams.addListener(new StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = logger.debug(s"QueryStarted [id = ${event.id}, name = ${event.name}, runId = ${event.runId}]")

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = logger.warn(s"QueryProgress ${event.progress}")

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = logger.debug(s"QueryTerminated [id = ${event.id}, runId = ${event.runId}, error = ${event.exception}]")
})

QueryProgressEvent,显示有关偏移量、水印、来源、接收器等的信息。

该视频可以帮助您:监控结构化流应用程序

于 2019-05-10T15:39:17.887 回答