有几点我会预先自愿:
- 我是 Flink 的新手(现在已经使用了大约一个月)
- 我正在使用 Kinesis Analytics(AWS 托管的 Flink 解决方案)。无论如何,这并没有真正限制 Flink 的多功能性或容错选项,但我还是会说出来。
我们有一个相当直接的滑动窗口应用程序。键控流通过特定键(例如 IP 地址)组织事件,然后在 ProcessorFunction 中处理它们。我们主要使用它来跟踪事物的数量。例如,过去 24 小时内特定 IP 地址的登录次数。每 30 秒,我们计算窗口中每个键的事件,并将该值保存到外部数据存储中。状态也会更新以反映该窗口中的事件,以便旧事件过期并且不占用内存。
有趣的是,基数不是问题。如果我们有 20 万人登录,在 24 小时内,一切都是完美的。当一个 IP 在 24 小时内登录 20 万次时,事情开始变得棘手。此时,检查点开始花费越来越长的时间。一个平均检查点需要 2-3 秒,但根据这种用户行为,检查点开始需要 5 分钟,然后是 10 分钟,然后是 15 分钟,然后是 30 分钟,然后是 40 分钟,等等。
令人惊讶的是,应用程序可以在这种情况下平稳运行一段时间。也许 10 或 12 个小时。但是,迟早检查点会完全失败,然后我们的最大迭代器年龄开始飙升,并且没有新的事件被处理等等。
在这一点上,我尝试了一些事情:
- 在问题上扔更多的金属(自动缩放也打开了)
- 大惊小怪 CheckpointingInterval 和 MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
- 重构以减少我们存储的状态的足迹
(1) 并没有真正做太多。(2) 这似乎有所帮助,但随后又一次比我们之前看到的更大的流量高峰消除了任何好处 (3) 目前尚不清楚这是否有帮助。我认为我们的应用程序内存占用与你想象的 Yelp 或 Airbnb 相比相当小,它们都使用 Flink 集群来处理大型应用程序,所以我无法想象我的状态真的有问题。
我会说我希望我们不必深刻改变对应用程序输出的期望。这个滑动窗口是一个非常有价值的数据。
编辑:有人问我的状态是什么样的 ValueState[FooState]
case class FooState(
entityType: String,
entityID: String,
events: List[BarStateEvent],
tableName: String,
baseFeatureName: String,
)
case class BarStateEvent(target: Double, eventID: String, timestamp: Long)
编辑:我想强调用户大卫安德森在评论中所说的话:
有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。
这是必不可少的。对于其他试图走这条路的人,我找不到一个可行的解决方案,它不会将事件存储在某个时间片中。我的最终解决方案是将事件分成 30 秒的批次,然后按照 David 的建议将它们写入地图状态。这似乎可以解决问题。对于我们的高负载期,检查点保持在 3mb 并且它们总是在一秒钟内完成。