3

我正在尝试为我们的 ST 服务器构建一个监控系统。到目前为止,诸如记录查询、检索的行/红色和花费的时间之类的都可以。

我已经实现了一个自定义监听器,我能够毫无问题地检索查询和时间,监听SparkListenerSQLExecutionStartSparkListenerSQLExecutionEnd

像这样的东西:

  //Structure to hold executionId, query itself from the description and startTime
  val queries = new HashMap[Long, (String, Long)]

def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    logger.info(s"----------executionId: ${event.executionId}")
    logger.info(s"----------description: ${event.description}")
    logger.info(s"----------startTime: ${event.time}")
    logger.info(s"----------metrics")

    queries.put(event.executionId, (event.description, event.time))
}

def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
  logger.info("-----onExecutionEnd")
  logger.info(s"---------executionId: ${event.executionId}")
  logger.info(s"---------endTime: ${event.time}")


  val executedPlanMetricsMap = queryExecution.executedPlan.metrics
  printMetrics(executedPlanMetricsMap, "executedPlan")
}

//Help method to print metrics map
def printMetrics[A] (metricsMap: Map[String, A], metricType:String): Unit ={
    try {
      logger.info(s"---------metrics from $metricType with size: ${metricsMap.size}")
      metricsMap.foreach { case (k, v) => logger.info(s"---------------metric from $metricType " +
        s"key: $k, value: $v")}
    } catch {
        case e: Exception => logger.info(s"---------empty $metricType")
      }
  }

到目前为止,我只是在日志上打印它们,只是为了检查是否可以检索我需要的值。这一示例输出

INFO EventMonitorCustomListener: ---------executionId: 16
INFO EventMonitorCustomListener: ---------endTime: 1630665171840
INFO EventMonitorCustomListener: ---------query: select *
    from <myDatabase>.<myTable>
INFO EventMonitorCustomListener: ---------metrics from executedPlan with size: 6
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numFiles, value: SQLMetric(id: 82, name: Some(number of files read), value: 1)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: pruningTime, value: SQLMetric(id: 85, name: Some(dynamic partition pruning time), value: -1)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: metadataTime, value: SQLMetric(id: 83, name: Some(metadata time), value: 121)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: filesSize, value: SQLMetric(id: 84, name: Some(size of files read), value: 36148)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numOutputRows, value: SQLMetric(id: 81, name: Some(number of output rows), value: 0)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numPartitions, value: SQLMetric(id: 86, name: Some(number of partitions read), value: 1)

如您所见,我得到了查询,即开始和结束时间所花费的总时间。但是当我检查执行计划中的指标时,值并不一致。numOutputRows 应该是数字时为 0(特别是在我的示例中为 21)我也尝试过,queryExecution.executedPlan.collectLeaves().head.metrics因为我在这里找到了相同的结果

第一个问题:是否可以从事件中检索输出行数SparkListenerSQLExecutionEnd

如果没有,我可以使用键“internal.metrics.input.recordsRead”SparkListenerStageCompleted从可累积的事件中检索它们。stageCompleted.stageInfo.accumulables但在这种情况下,我无法将阶段或作业链接到 SQL 执行。即我得到了 jobid=4 和 stageId=5, executionId=12 但在任何情况下都没有任何价值可以将它们链接到另一个。

第二个问题:你知道一种方法可以知道执行属于哪个阶段或作业吗?

亲切的问候

编辑:

我找到了一种通过 jobStart.executionId 将 executionId 与 JobId 联系起来的方法。它在 docker 的 STS 中运行良好,但在我真正的 STS 中却不行。可能与 STS 配置有关?

4

1 回答 1

0

我们终于找到了为什么我的 jobStart 在属性中没有 executionId 的原因。这是因为增量收集设置为真。设置为 false 可以解决问题。

于 2021-09-12T07:43:40.783 回答