4

我有一个似乎很常见的问题,但我无法弄清楚 Beam 推荐的解决方案是什么。

我有一个原始事件流,我正在寻找两个单独的事件来满足滑动窗口(60 分钟)内的条件,以便它“触发”警报。

这很容易做到SlidingWindows,但是问题在于它的滑动性质,我有效地在多个窗口中获得了该警报。我如何最终获得仅输出一次此类警报的 PCollection(在特定时间范围/冷却持续时间内)?

我首先认为最近的状态处理功能将是我的解决方案,但后来意识到它只能在窗口内工作。侧面输入也是如此。所以在我看来,我需要一种打破窗户并在一个(可能的会话)窗口中处理警报“触发”的方法。但是文档没有提到任何有效地将元素重新分配给新窗口的方法

4

2 回答 2

2

有趣的应用!

简单总结一下:

  • 对于您的用例来说,这听起来像“滑动窗口”意味着不断滑动。您可以选择最小粒度,但不一定是自然的。
  • 您感兴趣的每组事件应该只产生一个输出。

有几种方法可以解决这个问题,具体取决于应用程序的其余部分。

一种方法是将数据留在全局窗口中并使用状态。您将不得不自己管理延迟 - 删除为时已晚的元素,考虑数据乱序等,并且通常保持您的状态有界。

另一种方法是使用带有Combineor 状态的滑动窗口(基本上,您已经尝试过),然后重新窗口化警报并进行重复数据删除。您可以为此使用固定窗口,因为警报应该具有确定的时间戳;窗口的末尾将控制何时自动收集状态,这样很方便。

于 2017-04-10T17:28:42.623 回答
2

我最终得到了一个重新窗口策略,类似于@Kenn 建议的。

所以我有来自滑动窗口集合的警报,我重新窗口化到会话窗口

.apply(Window.remerge())            
.apply(Window.into(Sessions.withGapDuration(Duration.standardHours(1))))

在那个窗口集合中,我可以只做一个groupBy并因此获得所有Alerts会话,在其中我可以应用我的冷却逻辑,即每小时只发出一个警报。

于 2017-04-13T18:41:37.557 回答