0

我有一个用例,其中大量日志将被 apache flink CEP 消耗。我的用例是找到蛮力攻击和端口扫描攻击。这里的挑战是,在普通 CEP 中,我们将值与“event” = login 之类的常量进行比较。在这种情况下,标准是不同的,因为在蛮力攻击的情况下,我们有如下标准。

用户名是常量并且 event="login failure" (分隔事件在 5 分钟内发生 5 次)。这意味着在 5 分钟内收到 5 次相同用户名的登录失败事件的日志

对于端口扫描,我们有以下标准。

ip 地址是常量,dest 端口是可变的(分隔符是事件在 1 分钟内发生 10 次)。这意味着在 1 分钟内收到了 10 个不同端口的具有恒定 IP 地址的日志。

4

1 回答 1

1

使用 Flink,当您想单独处理一个用户名或一个 IP 地址之类的事件时,执行此操作的方法是通过一个键对流进行分区,使用keyBy(). Flink 文档中的培训材料有一个关于 Keyed Streams的部分,更详细地解释了 DataStream API 的这一部分。keyBy()与 SQL中的 a 概念大致相同GROUP BY,如果有帮助的话。

使用 CEP,如果您首先对流进行键控,那么模式将针对键的每个不同值分别匹配,这就是您想要的。

但是,对于这个用例,我会推荐 Flink SQL,而不是 CEP,可能与MATCH_RECOGNIZE结合使用。MATCH_RECOGNIZE 是一个更高级别的 API,建立在 CEP 之上,并且更易于使用。结合SQL,结果相当强大。

您可以在 Ververica 的 github 帐户中找到一些 Flink SQL 培训材料和示例(包括使用 MATCH_RECOGNIZE 的示例) 。

更新

需要明确的是,我不会将 MATCH_RECOGNIZE 用于这些特定规则;此用例既不需要它也不需要 CEP。我提到了它,以防您有其他有用的规则。(我在这种情况下不推荐 CEP 的原因是实施 distinct 约束可能会很麻烦。)

例如,对于端口扫描情况,您可以执行以下操作:

SELECT e1.ip, COUNT(DISTINCT e2.port) 
FROM events e1, events e2 
WHERE e1.ip = e2.ip AND timestampDiff(MINUTE, e1.ts, e2.ts) < 1 
GROUP BY e1.ip HAVING COUNT(DISTINCT e2.port) >= 10;

登录案例类似,但更简单。

请注意,在使用流式 SQL 时,您应该考虑一下状态保留

进一步更新

此查询可能会多次返回给定的 IP 地址,但不希望生成多个警报。

这可以通过将匹配的 IP 地址插入到警报表中来处理,并且只为不存在的 IP 生成警报。

或者 SQL 查询的输出可以由使用 DataStream API 实现的去重器处理,类似于Flink 文档中的示例。如果您只想在一段时间内抑制重复警报,请使用 aKeyedProcessFunction而不是 a RichFlatMapFunction,并使用 Timer 在需要重新启用给定 IP 的警报时清除状态。

另一个更新(关于 CEP 和独特性)

用 CEP 实现这一点应该是可能的。您需要通过 IP 地址对流进行加密,并拥有一个必须在一分钟内匹配的模式。

模式大概是这样的:

Pattern<Event, ?> pattern = Pattern
  .<Event>begin("distinctPorts")
  .where(iterative condition 1)
  .oneOrMore()
  .followedBy("end")
  .where(iterative condition 2)
  .within(1 minute)

如果添加到模式的事件与所有先前匹配的事件具有不同的端口,则第一个迭代条件返回 true。有点类似于docs 中的示例。

size("distinctPorts") >= 9如果并且此事件还有另一个不同的端口,则第二个迭代条件返回 true 。

请参阅此 Flink Forward 演讲youtube 视频),在演讲结束时有一个类似的示例。

如果您尝试此操作并遇到困难,请提出一个新问题,向我们展示您尝试过的内容以及卡住的位置。

于 2020-09-29T20:07:37.000 回答