0

我想先通过下面的场景来解释我的问题陈述。

场景: 我正在使用 flink+java8 的 flink 的 PROCESS_CONTINOUS 模式进行连续文件读取。

这实际上是一种批量读取功能,其中不同的文件将在一天的不同时间收到。因此,假设 file_1.csv 在下午 3:00 到达,那么我的 flink 作业将读取此文件。file-2.csv 再次在下午 3:30 到达,然后 flink 作业也将读取此文件,并且该过程将继续以这种方式工作,直到作业停止。我们将这些数据下沉到 Kafka。

问题: 当我重新启动 flink 作业时,它开始读取所有先前读取的文件的数据。这意味着我在重新启动作业时一次又一次地获得相同的记录。

有没有办法防止数据重复?

4

1 回答 1

2

听起来您在重新启动时正在丢弃作业的状态。如果您通过从检查点或保存点重新启动来进行有状态重新启动,那么新作业应该从前一个作业停止的地方开始。

有关更多信息,请参阅https://ci.apache.org/projects/flink/flink-docs-stable/docs/try-flink/flink-operations-playground/#upgrading--rescaling-a-job

于 2021-09-08T00:38:38.953 回答