1

我正在使用 PyFlink 进行流处理,并添加了一些指标来监控性能。

这是我用指标注册 udf 的代码。我已经安装了 apache-flink 1.13.0。

class Test(ScalarFunction):
    def __init__(self):
        self.gauge_value = 0

    def open(self, function_context):
        metric_group = function_context.get_metric_group().add_group("test")
        metric_group.gauge("gauge", lambda: self.gauge_value)

    def eval(self, i):
        self.gauge_value = i
        return i


test = udf(
    Test(),
    input_types=[DataTypes.BIGINT()],
    result_type=DataTypes.BIGINT(),
)

日志中没有引发异常,一旦我将类型更改self.gauge_value为str,它就会出错,所以我相信该行已执行并注册了仪表。

但是,我永远无法在 Web UI 的指标选项卡中找到仪表。我尝试在此测试 udf 中添加计数器和计量器,它们都正确显示在指标选项卡中。

PyFlink 支持仪表吗?如果没有,还有其他选择吗?

4

0 回答 0