3

我正在使用 ELK(logstash、ES、Kibana)堆栈进行日志分析,并使用 Riemann 进行警报。我有日志,其中用户是由 logstash 解析的字段之一,我将事件从 riemann 输出插件发送到 riemann。

Logstash 解析日志,用户是字段之一。例如:日志解析

Timestamp              user     command-name
 2014-06-07...         root      sh ./scripts/abc.sh
 2014-06-08...         sid       sh ./scripts/xyz.sh
 2014-06-08...         abc       sh ./scripts/xyz.sh
 2014-06-09...         root      sh ./scripts/xyz.sh

日志存储:

riemann {
    riemann_event => {
        "service"     => "logins"
        "unique_user" => "%{user}"
    }
}

因此用户值将是:root、sid、abc、root、sid、def 等。

所以我按用户拆分流,即每个唯一用户一个流。现在,我想在唯一用户数超过 3 时发出警报。我写了以下内容,但没有达到我的目的。

黎曼:

(streams

 (where (service "logins")
  (by :unique_user
    (moving-time-window 3600 
     (smap (fn [events]
      (let
        [users (count events)]
         (if (> users 3)
          (email "abc@gmail.com")       
     ))))))))

我是 Riemann 和 Clojure 的新手。任何帮助表示赞赏。

4

1 回答 1

1

email返回一个流。因此,要使其工作,您必须要么将其用作流,将其作为参数传递给另一个流,要么call-rescue直接将事件发送给它。此外,用于接收来自多个源(例如您的警报目标)的事件的流应该创建一次,并存储在一个变量中以供重复使用。

第一种方法,仅使用抽象流:

(let [alert (email "abc@gmail.com")]
  (streams
    (where (service "logins")
      (by :unique_user
        (moving-time-window 3600
          (smap folds/count
            (where (> metric 3) alert)))))))

第二种方法,使用call-rescue

(let [alert (email "abc@gmail.com")]
  (streams
    (where (service "logins")
      (by :unique_user
        (moving-time-window 3600
          (fn [events]
            (when (> (count events) 3)
              (call-rescue (last events) alert))))))))
于 2017-01-16T20:39:38.110 回答