我正在使用带有 logstash-logback-encoder 的 ELK 将日志推送到 Logstash。现在我想使用相同的堆栈,即带有 logstash-logback-encoder 的 ELK 进行分析。
流动:
API(Create User)----> Commit data to RDBMS ----->
Callback Listener(on post persist and post update) --->
Logger.info("IndexName: {} . DocId: {} .User json: {}", "Customer", user.getID(), user.getJson());
记录器.info(); logstash-logback-encoder,将数据推送到Logstash,Logstash将数据推送到ES。
我的logstash.conf如下:
input {
tcp {
port => 5044
codec => multiline {
what => "previous"
}
}
}
filter{
grok {
match => ["message", "(?<index_name>(?<=IndexName: ).*?(?=\s))"]
match => ["message", "(?<doc_id>(?<=DocId: ).*?(?=\s))"]
break_on_match => false
remove_tag => ["_grokparsefailure","multiline"]
}
mutate {
gsub => ['message', "\t", " "]
gsub => ['message',"\e\[(\d*;)*(\d*)m"," "]
}
}
output {
if [index_name] == "Customer" {
elasticsearch {
hosts => ["localhost:9200"]
index => "analytics-customers"
document_id => "%{doc_id}"
}
}else {
elasticsearch {
hosts => ["localhost:9200"]
}
}
stdout { codec => rubydebug }
}
我的问题是,如果我想使用 Logstash 进行分析,那么我必须使用 grok 解析 json。随着我拥有的表和字段的数量,logstash.conf 将变得非常庞大。
有没有一种方法可以在 logstash.conf 中应用 grok 模板,我可以根据索引名称调用它。喜欢:
grok {
match => ["message", "(?<index_name>(?<=IndexName: ).*?(?=\s))"]
if(index_name=="User"){
//Invoke User template which will fetch/create fields from passed json.
}
if(index_name=="Order"){
//Invoke Order template which will fetch/create fields from passed json.
}
}