背景:
我们有一个 Flink 管道,它由多个源、多个接收器和管道沿线的多个运算符组成,这些运算符还更新数据库。
为了这个问题并使其更简单,让我们假设我们有一个看起来像这样的管道:
Source -> KeyBy -> FlatMap -> Filter -> Sink
该管道应该允许我们收听有关某些数据更改的通知。(每个通知都包含一个 ID)对于每个通知,我们从数据库中读取数据,运行算法并更新相同的数据库行。之后,我们还发出数据变化的幅度。只有当数据变化幅度足够大时,我们才会向另一个 Kafka 主题发出通知。
- Source 订阅 Kafka 主题以侦听更改数据 ID 的通知。
- KeyBy 是通过 ID 键入的,以确保同一 ID 不会被 2 个操作员实例同时处理。
- 给定 ID,FlatMap 从 DB 读取数据,运行算法并更新相同的 DB 行。它发出变化幅度。它是 FlatMap 而不是 Map,因为在某些情况下,我们不想发出任何变化幅度,例如,如果我们有一些特定的错误。
- 过滤器过滤流的幅度小于某个阈值
- Sink 正在将过滤后的通知发送到另一个 Kafka 主题。
问题:
我们希望以一次性语义运行管道。从我们看到的情况来看,Flink 支持 Kafka 源、Kafka 接收器以及中间的有状态或有状态操作符的一次性语义。我们找不到任何地方解释如何使用您在管道中更新的资源执行一次。有一个TwoPhaseCommitSinkFunction允许创建一个允许完全一次语义的接收器函数。
我们不能使用它,因为我们想更新数据库,然后向 Kafka 发出更改通知。在 2 个单独的接收器中执行此操作将产生竞争条件,我们可以在数据库实际更新之前收到幅度通知。
我们错过了什么吗?有没有办法在 Map/FlatMap 运算符中实现 2 阶段提交?还有其他解决方案吗?
谢谢!