我正在使用 PyFlink 进行流处理,并添加了一些指标来监控性能。
这是我用指标注册 udf 的代码。我已经安装了 apache-flink 1.13.0。
class Test(ScalarFunction):
def __init__(self):
self.gauge_value = 0
def open(self, function_context):
metric_group = function_context.get_metric_group().add_group("test")
metric_group.gauge("gauge", lambda: self.gauge_value)
def eval(self, i):
self.gauge_value = i
return i
test = udf(
Test(),
input_types=[DataTypes.BIGINT()],
result_type=DataTypes.BIGINT(),
)
日志中没有引发异常,一旦我将类型更改self.gauge_value
为str,它就会出错,所以我相信该行已执行并注册了仪表。
但是,我永远无法在 Web UI 的指标选项卡中找到仪表。我尝试在此测试 udf 中添加计数器和计量器,它们都正确显示在指标选项卡中。
PyFlink 支持仪表吗?如果没有,还有其他选择吗?