pyflink 的用户定义指标定义如下。
import logging
import time
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, MapFunction, RuntimeContext
from pyflink.datastream.connectors import StreamingFileSink
class TestMapFunction(MapFunction):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.offset = 0
def open(self, runtime_context: RuntimeContext):
super().open(runtime_context)
group = runtime_context.get_metrics_group()
test_group = group.add_group("test_group")
test_group.gauge("current_offset", lambda: self.offset)
logging.info('#### add gauge')
def map(self, value):
self.set_offset(value[0])
time.sleep(1)
logging.info(f"#### value: {value}")
return value
def set_offset(self, offset):
logging.info(f'### offset: {offset}')
self.offset = offset
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_python_executable("~/.pyenv/versions/python3.7_common/bin/python")
env.disable_operator_chaining()
ds = env.from_collection(
collection=list(map(lambda i: (i, f'#{i}'), range(100000))),
type_info=Types.ROW([Types.INT(), Types.STRING()]))
ds = ds.map(TestMapFunction())
temp_sink = StreamingFileSink.for_row_format('/tmp/output', Encoder.simple_string_encoder()).build()
ds.add_sink(temp_sink)
env.execute("tutorial_job")
提交到 flink 集群后,我调用了 metrics api,但找不到 'test_group' 或 'current_offset' 指标。我在仪表板上也找不到。http://localhost:8081/jobs/a680d06c3957d484700878c47fe5d5bd/metrics http://localhost:8081/jobs/a680d06c3957d484700878c47fe5d5bd/vertices/e3dfc0d7e9ecd8a43f85f0b68ebf3b80/metrics
flink 版本为 1.13.1 ,python 版本为 3.7 。
我应该怎么办?