让我解释一个我需要处理的场景。让我们假设三个设备 A、B、C 正在向 flink CEP 发送日志进行处理。让我们假设模式为 A,然后是 5 分钟 B,然后是 5 分钟后的 C。让我们假设 B 设备停机并在 50 分钟后发送日志的场景。所以在这种情况下,所有事件都将被丢弃。我只是想知道 flink 中是否有任何支持将状态维持在特定定义的时间间隔(假设在我的情况下为 1 天,这意味着 A 和 C 日志将存储 1 天后日志将是在不匹配的情况下丢弃)。请从 CEP 的角度提出可行性。
问问题
36 次
1 回答
0
until
据我所知,没有比这更具体的了within
,但这些都是用来指定 t 的。这取决于您的确切设置,但是如果您将所有数据放入一个主题中,则可能很难防止设备长时间停机。您可以尝试修改水印生成逻辑,但这意味着它通常会延迟输出。
在这种情况下,您可以考虑使用ProcessFunction
更灵活的自定义逻辑,并允许您以更好的粒度处理状态。
编辑:
所以,基本上你需要创建一个状态来保存部分匹配,这取决于它可能是ListState
或的情况ValueState
,然后简单地将你找到的任何部分匹配放在那里。因此,如果您想要 A -> B -> C,那么如果您有 A,您将检查并将其放入状态,然后如果您收到 B,您可以检查时间戳并将其附加到状态,最后如果您有 C您可以发出整个匹配并清除状态。
如果您在那里设置 stateTTL,这只会断言状态将在一段时间未读/写后自动清除。
另请注意,如果模式不是很复杂,这是有道理的,否则编写逻辑很快就会成为一场噩梦。
于 2020-09-23T14:08:13.563 回答