0

我有一张桌子Metadata

我想要我的 Flink 应用程序中的表格内容。所以我想读取表中的所有条目并保存到MapState<Metadata::Id, Metadata>.

如果我的应用程序重新启动,我不想从表中读取,而是从表中读取MapState<Metadata::Id, Metadata>并使用它。

有没有办法我可以做到这一点?

4

1 回答 1

1

我在此答案中链接到的 youtube 视频和 github 存储库涵盖了许多类似的场景。但是引导 Flink 状态的最好方法是使用State Processor API将数据预加载到保存点中。

请记住,FlinkMapState是一种键分区状态。因此,如果您使用MapState<Metadata::Id, Metadata>,那实际上Map<KEY, MapState<Metadata::Id, Metadata>>是由 KEY 在集群中分片的。

下面是一个示例,展示了如何创建包含 a 的保存点ValueState<Integer>

public class Bootstrap {
    public static void main( String[] args ) throws Exception {
        ExecutionEnvironment bEnv =
                ExecutionEnvironment.getExecutionEnvironment();

        BootstrapTransformation<Integer> transform =
                OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
                        .keyBy(String::valueOf)
                        .transform(new SimplestTransform());

        Savepoint
                .create(new FsStateBackend("file:///tmp/checkpoints"), 256)
                .withOperator("my-operator-uid", transform)
                .write("file:///tmp/savepoints/");

        bEnv.execute();
    }

    static public class SimplestTransform
            extends KeyedStateBootstrapFunction<String, Integer> {
        ValueState<Integer> state;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Integer> descriptor = new
                    ValueStateDescriptor<>("total", Types.INT);
            state = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Integer value, Context ctx) throws Exception {
            state.update(value);
        }
    }
}

这将创建一个分片键/值映射,其中包含{"1": 1, "2": 2, "3": 3}.

于 2020-11-23T09:52:53.890 回答