0

我有一个 java spring 应用程序,它基于创建拓扑结构的 DTO 将拓扑提交给风暴(1.1.2)nimbus。

除了非常大的窗户外,这很好用。我正在用几个不同的滑动和翻滚窗口对其进行测试。除了每 15 分钟推进一次的 24 小时滑动窗口之外,没有人给我任何问题。该拓扑将从 Kafka 接收约 250 条消息/秒,并使用一个简单的时间戳提取器将它们窗口化,延迟为 3 秒(就像我正在测试的所有其他拓扑一样)。

我已经与工人和内存限额一起玩过,试图解决这个问题,但我的默认配置是 1 个工人,堆大小为 2048mb。我也尝试过减少影响最小的滞后。

我认为窗口大小可能变得太大并且工作人员内存不足,这会延迟心跳或 Zookeeper 连接签入,这反过来又会导致 Nimbus 杀死工作人员。

发生的情况是每隔一段时间(大约 11 个窗口前进),Nimbus 日志报告该拓扑的执行器“不活动”,并且该拓扑的工作日志显示KeeperException拓扑无法与 Zookeeper 通信或java.lang.ExceptionInInitializerError:null与巢PrivelegedActionException

当拓扑被分配一个新的工作人员时,我正在做的聚合丢失了。我认为这是因为窗口至少保存 250*60*15*11 (messagesPerSecond*secondsPerMinute*15mins*windowAdvancesBeforeCrash) 消息,每个消息大约 84 个字节。要完成整个窗口,它将最终成为 250*60*15*97 条消息(messagesPerSecond*secondsPerMinute*15mins*15minIncrementsIn24HoursPlusAnExpiredWindow)。如果我的数学是正确的,这大约是 1.8gbs,所以我觉得工人内存应该覆盖窗口,或者至少超过 11 个窗口推进值。

我可以稍微增加内存,但不会太多。我还可以减少内存/工作人员的数量并增加工作人员/拓扑的数量,但我想知道我是否缺少一些东西?我是否可以增加工作人员的心跳时间,以便执行者在被杀之前有更多时间签到,或者由于某种原因这会很糟糕?如果我更改了心跳 if 将在拓扑的配置图中。谢谢!

4

1 回答 1

1

这是由于工作人员内存不足造成的。从查看 Storm 代码。看起来 Storm 将窗口中的每条消息都作为一个元组(这是一个相当大的对象)。由于消息率高且窗口为 24 小时,内存量很大。

我通过使用一个初步的分桶螺栓来解决这个问题,该螺栓将在最初的 1 分钟窗口中分桶所有元组,这显着减少了主窗口上的负载,因为它现在每分钟接收一个元组。分桶窗口不会耗尽内存,因为它的窗口中一次只有一分钟的元组。

于 2018-12-07T16:06:43.503 回答