0

我有每分钟聚合的站点状态(连接/断开连接)的流,其中聚合被抑制,并且对于每个分钟键,我们只有一个记录。现在,当我们的状态从一分钟变为另一分钟时,我想写入新的“警报”主题消息。

例如 17/03/21 12:00 - 已连接,17/09/21 12:01 - 17/09/21 断开连接,我们要在 12:01 写入断开连接警报。

我尝试做的是获取stream1,并将其重用于时间延长1分钟的stream2。然后与窗口 0 进行连接,以便将每个聚合与前一分钟的结果连接起来,并过滤具有不同状态的聚合

例如 stream1 17/03/21 12:00 - 已连接,17/09/21 12:01 - 已断开 stream2 将是 17/03/21 12:01 - 已连接,17/09/21 12:02 - 已断开零窗口将仅加入 17/09/21 12:01 状态不同,因此记录将保留在流中,我会将其写入警报主题。

问题是加入不是我所期望的记录,从 stream1 到 12:01 从 stream2 用了 12:00,从 12:02 用了 12:01(没有宽限的零窗口)。好像我对时间所做的更改并没有影响,并且加入仍然认为12:01是12:00。我错过了什么?

4

0 回答 0