1

我有一个WordCount用 Flink (Scala) 编写的示例流示例。在其中,我想使用外部检查点来在发生故障时进行恢复。但它没有按预期工作。

我的代码如下:

object WordCount {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = StreamExecutionEnvironment
      .getExecutionEnvironment
      .setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))

    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000)

    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)

    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    // prepare Kafka consumer properties
    val kafkaConsumerProperties = new Properties
    kafkaConsumerProperties.setProperty("zookeeper.connect", "localhost:2181")
    kafkaConsumerProperties.setProperty("group.id", "flink")
    kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092")

    // set up Kafka Consumer
    val kafkaConsumer = new FlinkKafkaConsumer[String]("input", new SimpleStringSchema, kafkaConsumerProperties)

    println("Executing WordCount example.")

    // get text from Kafka
    val text = env.addSource(kafkaConsumer)

    val counts: DataStream[(String, Int)] = text
      // split up the lines in pairs (2-tuples) containing: (word,1)
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      // group by the tuple field "0" and sum up tuple field "1"
      .keyBy(0)
      .mapWithState((in: (String, Int), count: Option[Int]) =>
        count match {
          case Some(c) => ((in._1, c), Some(c + in._2))
          case None => ((in._1, 1), Some(in._2 + 1))
        })

    // emit result
    println("Printing result to stdout.")
    counts.print()

    // execute program
    env.execute("Streaming WordCount")
  }
}

我第一次运行程序后得到的输出是:

(hi, 1)
(hi, 2)

第二次运行程序后得到的输出是:

(hi, 1)

我的期望是第二次运行程序应该给我以下输出:

(hi, 3)

由于我是 Apache Flink 的新手,我不知道如何达到预期的效果。谁能帮助我实现正确的行为?

4

2 回答 2

5

如果应用程序在同一次执行中重新启动(定期、自动恢复),Flink 只会从最新的检查点重新启动。

如果您在 IDE 中取消本地执行环境中运行的作业,则会杀死整个集群,并且该作业无法自动恢复。相反,您需要重新启动它。为了从保存点(或外部检查点)重新启动新作业,您需要提供到持久保存点/检查点的路径。不确定本地执行环境是否可行。

IMO 更容易在本地 Flink 实例上而不是在 IDE 中进行检查点和恢复。

于 2019-04-15T11:00:33.770 回答
1

我之前遇到过同样的问题,但我能够使用 MiniCluster 使其工作。如此处所述 - http://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3CCAO_f5ND=0f+uBbReSfThMBi-bnY4BjGBozo3fzEsZujiovb_-g@mail.gmail.com%3E

我在文档中没有找到很多关于 MiniCluster 的文档,所以不确定它是否是推荐的方式。

在完全重新启动作业时,我必须编写一小段代码来识别存储在具有 _metadata 目录的检查点目录 (/jobId/chk-*) 下的最新检查点。然后用于streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(s)从该检查点恢复状态。

于 2020-12-17T00:31:48.110 回答