我正在使用自定义 dropwizard 指标源和报告器。当我直接调用指标而不是通过指标注释时,一切都很好。
在下面的代码中,对 testTimer 的调用无需使用 Spark Metrics 注册源即可工作(我的自定义源是一个对象,它在其末尾调用了 register 方法,该方法会根据需要进行初始化)
但不幸的是,如果没有为每个执行程序手动注册源,这对于指标注释不起作用(如预期的那样)(指标系统不知道我的自定义源/报告器,我们需要这样通知)。
在下面的示例中(顺便说一下),我正在使用 sc.parrallelige 但在现实世界的应用程序中我们不能使用这种方法(尤其是在使用 spark.dynamicAllocation.enabled 时),而是我们需要注册一个 spark 监听器( addSparkListener 然后覆盖 onExecutorAdded)。
鉴于这种情况,我如何告诉 spark 在添加执行程序时调用我的 register() ?(并取消注册 onExecutorRemoved)
object MetricsTestApp extends App {
System.setProperty("configName", "testEpa")
val conf = new SparkConf().setAppName("Test") //.setMaster("local[*]")
val sparkContext = new SparkContext(conf) // .getOrCreate(conf)
sparkContext.setLogLevel("WARN")
val total = sparkContext.parallelize(1 to 3)
.map(i => {
// testTimer <-- This works fine
LashmMetrics.register()
Thread.sleep(5000)
assert(Hello.SayHello == 2)
Thread.sleep(4000)
assert(Hello.SayHello == 2)
})
.count
Thread.sleep(20000)
println(s"Total $total")
object Hello extends Serializable {
@Timed(name="TestAnnotationTimed")
def SayHello: Int = {
println("testing....")
Thread.sleep(1000)
2
}
}
def testTimer {
Thread.sleep(500)
timer("TimerFromTestApp").time{
println("sleepy..")
Thread.sleep(1000)
println("done")
}
Thread.sleep(500)
}
}