2

由于代码中的错误或缺乏验证,进入 Flink 作业的数据可能会触发异常。我的目标是提供一致的异常处理方式,我们的团队可以在 Flink 作业中使用,不会导致生产中的任何停机。

  1. 重启策略似乎不适用于这里:

    • 简单的重启不会解决问题,我们会陷入重启循环
    • 我们不能简单地跳过事件
    • 它们可能对 OOME 或一些暂时性问题有好处
    • 我们不能添加自定义的
  2. “keyBy”函数中的 try/catch 块并不能完全帮助:

    • 处理异常后无法跳过“keyBy”中的事件

示例代码:

env.addSource(kafkaConsumer)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");

我希望能够跳过导致“keyBy”问题的事件的处理以及应该只返回一个结果的类似方法。

4

2 回答 2

1

在这种情况下,您可以保留一个特殊值(如“NULL”)keyBy以返回吗?那么你的flatMap函数在遇到这样的值时可以跳过吗?

于 2019-04-02T10:10:13.447 回答
1

除了@phanhuy152 的建议(这对我来说似乎完全合法),为什么不在filter之前keyBy呢?

env.addSource(kafkaConsumer)
    .filter(invalidKeys)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
于 2019-04-02T12:20:53.547 回答