0

pyflink 的用户定义指标定义如下。

import logging
import time

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, MapFunction, RuntimeContext
from pyflink.datastream.connectors import StreamingFileSink


class TestMapFunction(MapFunction):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.offset = 0

    def open(self, runtime_context: RuntimeContext):
        super().open(runtime_context)
        group = runtime_context.get_metrics_group()
        test_group = group.add_group("test_group")
        test_group.gauge("current_offset", lambda: self.offset)
        logging.info('#### add gauge')

    def map(self, value):
        self.set_offset(value[0])
        time.sleep(1)
        logging.info(f"#### value: {value}")
        return value

    def set_offset(self, offset):
        logging.info(f'### offset: {offset}')
        self.offset = offset


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_python_executable("~/.pyenv/versions/python3.7_common/bin/python")
    env.disable_operator_chaining()

    ds = env.from_collection(
        collection=list(map(lambda i: (i, f'#{i}'), range(100000))),
        type_info=Types.ROW([Types.INT(), Types.STRING()]))

    ds = ds.map(TestMapFunction())
    temp_sink = StreamingFileSink.for_row_format('/tmp/output', Encoder.simple_string_encoder()).build()
    ds.add_sink(temp_sink)

    env.execute("tutorial_job")

提交到 flink 集群后,我调用了 metrics api,但找不到 'test_group' 或 'current_offset' 指标。我在仪表板上也找不到。http://localhost:8081/jobs/a680d06c3957d484700878c47fe5d5bd/metrics http://localhost:8081/jobs/a680d06c3957d484700878c47fe5d5bd/vertices/e3dfc0d7e9ecd8a43f85f0b68ebf3b80/metrics

flink 版本为 1.13.1 ,python 版本为 3.7 。

我应该怎么办?

4

0 回答 0