我有一个 flink 批处理作业,它从 S3 读取一个非常大的镶木地板文件,然后它将一个 json 下沉到 Kafka 主题中。
问题是如何使文件读取过程有状态?我的意思是每当工作中断或崩溃时,工作应该从以前的阅读状态开始?我不想在作业重新启动时向 Kafka 发送重复的项目。
这是我的示例代码
val env = ExecutionEnvironment.getExecutionEnvironment
val input = Parquet.input[User](new Path(s"s3a://path"))
env.createInput(input)
.filter(r => Option(r.token).getOrElse("").nonEmpty)