0

我正在编写一个 Flink 程序,在重新启动新作业时无法保存我的状态变量。

我使用来自 Kafka 的连接器制作了一个简单的程序,在该连接器中接收消息和带有 valueState 变量的 RichFlatMap。这个变量是一个整数,每收到一条消息就加 1。

当值在 15 左右时,我创建了一个停止保存点,但是当我从该保存点恢复它时,计数器又回到 1。

Streamingjob.java:

KeyedStream<JsonNode, Object> eventsByKey = env
                .addSource(consumer).name("Producer Topic Source")
                .keyBy(e -> {...});

eventsByKey 
        .flatMap(new Test())
        .uid("test-id")

测试.java:

public class Test extends RichFlatMapFunction<JsonNode, JsonNode> {

    private transient ValueState<Integer> persistence;

    @Override
    public void flatMap(JsonNode node, Collector<JsonNode> collector) throws Exception {
        if (persistence.value() == null) persistence.update(1);
        String device_id = node.get("data").get("device_id").toString();
        System.out.println();
        System.out.println(device_id);
        System.out.println(persistence.value());
        System.out.println();
        persistencia.update(persistence.value() + 1);
    }

    @Override
    public void open(Configuration config) {
        this.persistence = getRuntimeContext().getState(new ValueStateDescriptor<>(
                "prueba", // the state name
                Integer.class));
    }
}

这是我用于停止保存点的命令 ../bin/flink stop --savepointPath f74c92af01ed51af94e530ee0e208d7c

而这个用于开始保存点 ../bin/flink run flink-andy-12.3.0.jar --savepointPath file:/{...}/savepoint-f74c92-6acdb05afd11

关于我应该做什么的任何想法?

4

1 回答 1

0

要从保存点重新启动,您需要指定--fromSavepoint,而不是--savepointPath。(文档

换句话说:

$ ./bin/flink run \
      --fromSavepoint /{...}/savepoint-f74c92-6acdb05afd11 \
      flink-andy-12.3.0.jar
于 2021-05-25T09:51:46.683 回答