0

我有一个带有多个节点的 Spark 集群。我的应用程序使用 spark-submit 命令在该集群上运行。我想从我的应用程序中收集指标,主要是我想在工作人员上运行的代码部分中收集的一些计数器和直方图。我知道 Spark 有一个累加器,但我不能在我的场景中使用它们。我读到了 Pushgateway 导出器,我想知道如何将在不同工作人员上创建的指标适当地推送到 Pushgateway?

我将举一个基本字数统计程序的示例,这只是为了简化我的用例:

CollectorRegistry registry = new CollectorRegistry();
JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaPairRDD<String, Integer> counts = textFile
     .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
     .mapToPair(word -> {
                          Counter wordsCounter = Counter.build().name("wordCounterMetric").help("some help msg").register(registry);
                          wordsCounter.inc();
                          new Tuple2<>(word, 1);
      })
     .reduceByKey((a, b) -> a + b);

PushGateway pg = new PushGateway(EXPORTER_ADDRESS);
pg.pushAdd(registry, JOB);

我不确定上面的示例是否会按我的预期工作......基本上,我想计算每个工作人员处理的单词总和,并将这个指标推送到将被 Prometheus 抓取的 Pushgateway。

如果 textFile = "This is my text" 在 Spark 进程结束时,Prometheus 中的 wordCounterMetric 值应该是 4。

我怎么能做到这一点?

4

0 回答 0