我使用 JMX 作为指标报告器来获取 Flink 指标,但是有没有办法将它作为终端的输出?
我想numRecordsInPerSecond
为每个运算符绘制性能分析,我该怎么做?
我看过一些累加器的例子,但它并没有让我正确地了解如何对 Flink 进行性能分析。我在这里给你一个例子
这是我的Flink程序的执行计划,我有多个问题,但我想问一个基本的
如何测量每个运算符的延迟,然后将其相加以计算复杂事件的总延迟。
如何测量输出吞吐量?目前,我已经在 select 函数中编写了一些代码,这些代码计算了看到的复杂事件的数量和 Flink 引擎启动的时间。这是最好的方法吗?
但基本问题仍然存在,即如何通过代码获取Flink 指标中提到的系统指标的输出以显示在终端输出中,因为我想绘制性能图表,而 JMX 的问题是它向我显示指标从某种意义上说,当我在 JMX 控制台中单击该特定指标时,我会看到这些值,这并不完全适合分析系统。
PS - 我在 StackOverflow 上发现了一个关于计算吞吐量和延迟的问题,答案是这样的
private static class MyMapper extends RichMapFunction<String, Object> {
private transient Meter meter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
}
@Override
public Object map(String value) throws Exception {
this.meter.markEvent();
return value;
}
}
我也在我的代码中添加了上面的类,但没有看到任何输出,我也想知道这个代码将如何显示吞吐量或延迟,因为我们没有提到我们想要找到哪个运算符的延迟?例如,我想在执行计划中间而不是计划结束时为某个操作员找到吞吐量,上面的代码会为我做吗?