2

我正在使用带有 logstash-logback-encoder 的 ELK 将日志推送到 Logstash。现在我想使用相同的堆栈,即带有 logstash-logback-encoder 的 ELK 进行分析。

流动:

API(Create User)----> Commit data to RDBMS -----> 
Callback Listener(on post persist and post update) ---> 
Logger.info("IndexName: {} . DocId: {} .User json: {}", "Customer", user.getID(), user.getJson());

记录器.info(); logstash-logback-encoder,将数据推送到Logstash,Logstash将数据推送到ES。

我的logstash.conf如下:

input {
tcp {
 port => 5044
 codec =>  multiline {
 what => "previous"
   }
 }  
}
filter{
grok {
   match => ["message", "(?<index_name>(?<=IndexName: ).*?(?=\s))"]
   match => ["message", "(?<doc_id>(?<=DocId: ).*?(?=\s))"]
   break_on_match => false
   remove_tag => ["_grokparsefailure","multiline"]
}
mutate {
  gsub => ['message', "\t", " "]
  gsub => ['message',"\e\[(\d*;)*(\d*)m"," "]
 }
}
output {
 if [index_name] == "Customer" {
         elasticsearch {
                hosts => ["localhost:9200"]
                index => "analytics-customers"
                document_id => "%{doc_id}"
                }
         }else {
          elasticsearch {
          hosts => ["localhost:9200"]
         }
      }
   stdout { codec => rubydebug }
   }

我的问题是,如果我想使用 Logstash 进行分析,那么我必须使用 grok 解析 json。随着我拥有的表和字段的数量,logstash.conf 将变得非常庞大。

有没有一种方法可以在 logstash.conf 中应用 grok 模板,我可以根据索引名称调用它。喜欢:

grok {
match => ["message", "(?<index_name>(?<=IndexName: ).*?(?=\s))"]
if(index_name=="User"){
 //Invoke User template which will fetch/create fields from passed json.
}
if(index_name=="Order"){
 //Invoke Order template which will fetch/create fields from passed json.
}
}
4

1 回答 1

0

如果您可以设法在一行中获取日志,那将是最好的方法。因为您可以将编解码器更改为“json_lines”,并且所有内容都会自动解析!

否则,您可以使用 IF(在此处描述)

例子:

if [subsystem] == "http" {
    mutate{ ... }
    grok{ ... }
}
于 2017-10-23T08:48:20.297 回答