我有一张桌子Metadata
。
我想要我的 Flink 应用程序中的表格内容。所以我想读取表中的所有条目并保存到MapState<Metadata::Id, Metadata>
.
如果我的应用程序重新启动,我不想从表中读取,而是从表中读取MapState<Metadata::Id, Metadata>
并使用它。
有没有办法我可以做到这一点?
我有一张桌子Metadata
。
我想要我的 Flink 应用程序中的表格内容。所以我想读取表中的所有条目并保存到MapState<Metadata::Id, Metadata>
.
如果我的应用程序重新启动,我不想从表中读取,而是从表中读取MapState<Metadata::Id, Metadata>
并使用它。
有没有办法我可以做到这一点?
我在此答案中链接到的 youtube 视频和 github 存储库涵盖了许多类似的场景。但是引导 Flink 状态的最好方法是使用State Processor API将数据预加载到保存点中。
请记住,FlinkMapState
是一种键分区状态。因此,如果您使用MapState<Metadata::Id, Metadata>
,那实际上Map<KEY, MapState<Metadata::Id, Metadata>>
是由 KEY 在集群中分片的。
下面是一个示例,展示了如何创建包含 a 的保存点ValueState<Integer>
:
public class Bootstrap {
public static void main( String[] args ) throws Exception {
ExecutionEnvironment bEnv =
ExecutionEnvironment.getExecutionEnvironment();
BootstrapTransformation<Integer> transform =
OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
.keyBy(String::valueOf)
.transform(new SimplestTransform());
Savepoint
.create(new FsStateBackend("file:///tmp/checkpoints"), 256)
.withOperator("my-operator-uid", transform)
.write("file:///tmp/savepoints/");
bEnv.execute();
}
static public class SimplestTransform
extends KeyedStateBootstrapFunction<String, Integer> {
ValueState<Integer> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> descriptor = new
ValueStateDescriptor<>("total", Types.INT);
state = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Integer value, Context ctx) throws Exception {
state.update(value);
}
}
}
这将创建一个分片键/值映射,其中包含{"1": 1, "2": 2, "3": 3}
.