11

我正在开发一个 Spark Streaming 程序,该程序检索 Kafka 流,对流进行非常基本的转换,然后将数据插入数据库(如果相关,则为 voltdb)。我正在尝试测量将行插入数据库的速率。我认为指标可能很有用(使用 JMX)。但是我找不到如何将自定义指标添加到 Spark。我查看了 Spark 的源代码,也找到了这个线程,但它对我不起作用。我还在 conf.metrics 文件中启用了 JMX 接收器。什么不起作用是我没有使用 JConsole 看到我的自定义指标。

有人可以解释如何添加自定义指标(最好通过 JMX)来触发流式传输吗?或者如何测量我对我的数据库(特别是 VoltDB)的插入率?我在 Java 8 中使用 spark。

4

5 回答 5

17

好的,在挖掘源代码后,我发现了如何添加我自己的自定义指标。它需要3样东西:

  1. 创建我自己的自定义。有点像这样
  2. 在 spark metrics.properties 文件中启用 Jmx 接收器。我使用的具体行是:*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSinkwhich enable JmxSink for all instances
  3. 在 SparkEnv 度量系统中注册我的自定义源。可以在此处查看如何操作的示例- 我之前实际上查看过此链接,但错过了注册部分,这使我无法在 JVisualVM 中实际看到我的自定义指标

我仍在为如何实际计算插入 VoltDB 的数量而苦苦挣扎,因为代码在执行程序上运行,但这是另一个主题的主题:)

我希望这对其他人有帮助

于 2015-10-01T09:12:00.330 回答
6

Groupon 有一个名为的库spark-metrics,可让您在执行程序上使用简单的(类似 Codahale 的)API,并将结果整理回驱动程序并自动注册到 Spark 现有的指标注册表中。然后,当您根据Spark 文档配置指标接收器时,这些指标会与 Spark 的内置指标一起自动导出。

于 2017-03-28T12:32:00.997 回答
3

要根据 VoltDB 的插入插入行,请使用累加器 - 然后从您的驱动程序中创建一个侦听器 - 也许这样可以让您入门

sparkContext.addSparkListener(new SparkListener() {
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
    stageCompleted.stageInfo.accumulables.foreach { case (_, acc) => {

在这里,您可以访问那些组合累加器的行,然后您可以发送到您的接收器..

于 2015-11-07T23:55:20.370 回答
2

下面是 Java 中的一个工作示例。
它已经过测试StreaminQuery(不幸的是,在 Spark 2.3.1 之前StreaminQuery没有 ootb 指标)。StreamingContext

脚步:

Source在同一个类包中定义自定义源

package org.apache.spark.metrics.source;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.spark.sql.streaming.StreamingQueryProgress;

/**
 * Metrics source for structured streaming query.
 */
public class StreamingQuerySource implements Source {
    private String appName;
    private MetricRegistry metricRegistry = new MetricRegistry();
    private final Progress progress = new Progress();

    public StreamingQuerySource(String appName) {
        this.appName = appName;
        registerGuage("batchId", () -> progress.batchId());
        registerGuage("numInputRows", () -> progress.numInputRows());
        registerGuage("inputRowsPerSecond", () -> progress.inputRowsPerSecond());
        registerGuage("processedRowsPerSecond", () -> progress.processedRowsPerSecond());
    }

    private <T> Gauge<T> registerGuage(String name, Gauge<T> metric) {
        return metricRegistry.register(MetricRegistry.name(name), metric);
    }

    @Override
    public String sourceName() {
        return String.format("%s.streaming", appName);
    }


    @Override
    public MetricRegistry metricRegistry() {
        return metricRegistry;
    }

    public void updateProgress(StreamingQueryProgress queryProgress) {
        progress.batchId(queryProgress.batchId())
                .numInputRows(queryProgress.numInputRows())
                .inputRowsPerSecond(queryProgress.inputRowsPerSecond())
                .processedRowsPerSecond(queryProgress.processedRowsPerSecond());
    }

    @Data
    @Accessors(fluent = true)
    private static class Progress {
        private long batchId = -1;
        private long numInputRows = 0;
        private double inputRowsPerSecond = 0;
        private double processedRowsPerSecond = 0;
    }
}

创建 SparkContext 后立即注册源

    querySource = new StreamingQuerySource(getSparkSession().sparkContext().appName());
    SparkEnv.get().metricsSystem().registerSource(querySource);

更新 StreamingQueryListener.onProgress(event) 中的数据

  querySource.updateProgress(event.progress());

配置metrics.properties

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=xxx
*.sink.graphite.port=9109
*.sink.graphite.period=10
*.sink.graphite.unit=seconds

# Enable jvm source for instance master, worker, driver and executor
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

石墨导出器中的样本输出(映射到普罗米修斯格式)

streaming_query{application="local-1538032184639",model="model1",qty="batchId"} 38
streaming_query{application="local-1538032184639",model="model1r",qty="inputRowsPerSecond"} 2.5
streaming_query{application="local-1538032184639",model="model1",qty="numInputRows"} 5
streaming_query{application="local-1538032184639",model=model1",qty="processedRowsPerSecond"} 0.81
于 2018-09-27T08:30:06.483 回答
2

这是一个很好的教程,涵盖了使用 Graphite 设置 Spark 的 MetricsSystem 所需的所有设置。这应该够了吧:

http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/

于 2015-09-29T15:09:25.517 回答