我有一个 java spring 应用程序,它基于创建拓扑结构的 DTO 将拓扑提交给风暴(1.1.2)nimbus。
除了非常大的窗户外,这很好用。我正在用几个不同的滑动和翻滚窗口对其进行测试。除了每 15 分钟推进一次的 24 小时滑动窗口之外,没有人给我任何问题。该拓扑将从 Kafka 接收约 250 条消息/秒,并使用一个简单的时间戳提取器将它们窗口化,延迟为 3 秒(就像我正在测试的所有其他拓扑一样)。
我已经与工人和内存限额一起玩过,试图解决这个问题,但我的默认配置是 1 个工人,堆大小为 2048mb。我也尝试过减少影响最小的滞后。
我认为窗口大小可能变得太大并且工作人员内存不足,这会延迟心跳或 Zookeeper 连接签入,这反过来又会导致 Nimbus 杀死工作人员。
发生的情况是每隔一段时间(大约 11 个窗口前进),Nimbus 日志报告该拓扑的执行器“不活动”,并且该拓扑的工作日志显示KeeperException
拓扑无法与 Zookeeper 通信或java.lang.ExceptionInInitializerError:null
与巢PrivelegedActionException
。
当拓扑被分配一个新的工作人员时,我正在做的聚合丢失了。我认为这是因为窗口至少保存 250*60*15*11 (messagesPerSecond*secondsPerMinute*15mins*windowAdvancesBeforeCrash) 消息,每个消息大约 84 个字节。要完成整个窗口,它将最终成为 250*60*15*97 条消息(messagesPerSecond*secondsPerMinute*15mins*15minIncrementsIn24HoursPlusAnExpiredWindow)。如果我的数学是正确的,这大约是 1.8gbs,所以我觉得工人内存应该覆盖窗口,或者至少超过 11 个窗口推进值。
我可以稍微增加内存,但不会太多。我还可以减少内存/工作人员的数量并增加工作人员/拓扑的数量,但我想知道我是否缺少一些东西?我是否可以增加工作人员的心跳时间,以便执行者在被杀之前有更多时间签到,或者由于某种原因这会很糟糕?如果我更改了心跳 if 将在拓扑的配置图中。谢谢!