由于代码中的错误或缺乏验证,进入 Flink 作业的数据可能会触发异常。我的目标是提供一致的异常处理方式,我们的团队可以在 Flink 作业中使用,不会导致生产中的任何停机。
重启策略似乎不适用于这里:
- 简单的重启不会解决问题,我们会陷入重启循环
- 我们不能简单地跳过事件
- 它们可能对 OOME 或一些暂时性问题有好处
- 我们不能添加自定义的
“keyBy”函数中的 try/catch 块并不能完全帮助:
- 处理异常后无法跳过“keyBy”中的事件
示例代码:
env.addSource(kafkaConsumer)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
我希望能够跳过导致“keyBy”问题的事件的处理以及应该只返回一个结果的类似方法。