0

我正在使用自定义 dropwizard 指标源和报告器。当我直接调用指标而不是通过指标注释时,一切都很好。

在下面的代码中,对 testTimer 的调用无需使用 Spark Metrics 注册源即可工作(我的自定义源是一个对象,它在其末尾调用了 register 方法,该方法会根据需要进行初始化)

但不幸的是,如果没有为每个执行程序手动注册源,这对于指标注释不起作用(如预期的那样)(指标系统不知道我的自定义源/报告器,我们需要这样通知)。

在下面的示例中(顺便说一下),我正在使用 sc.parrallelige 但在现实世界的应用程序中我们不能使用这种方法(尤其是在使用 spark.dynamicAllocation.enabled 时),而是我们需要注册一个 spark 监听器( addSparkListener 然后覆盖 onExecutorAdded)。

鉴于这种情况,我如何告诉 spark 在添加执行程序时调用我的 register() ?(并取消注册 onExecutorRemoved)

object MetricsTestApp extends App {

System.setProperty("configName", "testEpa")
val conf = new SparkConf().setAppName("Test") //.setMaster("local[*]")
val sparkContext = new SparkContext(conf) //  .getOrCreate(conf)
sparkContext.setLogLevel("WARN")

val total = sparkContext.parallelize(1 to 3)
    .map(i => {
//      testTimer <-- This works fine
    LashmMetrics.register()
    Thread.sleep(5000)
    assert(Hello.SayHello == 2)
    Thread.sleep(4000)
    assert(Hello.SayHello == 2)      
    })
    .count

Thread.sleep(20000)
println(s"Total $total")

object Hello extends Serializable {

    @Timed(name="TestAnnotationTimed")
    def SayHello: Int = {
    println("testing....")
    Thread.sleep(1000)
    2
    }
}

def testTimer {

    Thread.sleep(500)
    timer("TimerFromTestApp").time{
    println("sleepy..")
    Thread.sleep(1000)
    println("done")
    }
    Thread.sleep(500)
}
}
4

0 回答 0