我正在编写一个 Flink 程序,在重新启动新作业时无法保存我的状态变量。
我使用来自 Kafka 的连接器制作了一个简单的程序,在该连接器中接收消息和带有 valueState 变量的 RichFlatMap。这个变量是一个整数,每收到一条消息就加 1。
当值在 15 左右时,我创建了一个停止保存点,但是当我从该保存点恢复它时,计数器又回到 1。
Streamingjob.java:
KeyedStream<JsonNode, Object> eventsByKey = env
.addSource(consumer).name("Producer Topic Source")
.keyBy(e -> {...});
eventsByKey
.flatMap(new Test())
.uid("test-id")
测试.java:
public class Test extends RichFlatMapFunction<JsonNode, JsonNode> {
private transient ValueState<Integer> persistence;
@Override
public void flatMap(JsonNode node, Collector<JsonNode> collector) throws Exception {
if (persistence.value() == null) persistence.update(1);
String device_id = node.get("data").get("device_id").toString();
System.out.println();
System.out.println(device_id);
System.out.println(persistence.value());
System.out.println();
persistencia.update(persistence.value() + 1);
}
@Override
public void open(Configuration config) {
this.persistence = getRuntimeContext().getState(new ValueStateDescriptor<>(
"prueba", // the state name
Integer.class));
}
}
这是我用于停止保存点的命令
../bin/flink stop --savepointPath f74c92af01ed51af94e530ee0e208d7c
而这个用于开始保存点
../bin/flink run flink-andy-12.3.0.jar --savepointPath file:/{...}/savepoint-f74c92-6acdb05afd11
关于我应该做什么的任何想法?