0

https://stackoverflow.com/a/51825609/16120054

大家好,

基于上面的链接解决方案,这是否需要在conf设置中使用pipeline.workers 1 来实现?

有人可以请教吗?

4

3 回答 3

0

我正在考虑另一种方式。将我的所有开发人员数据通过一个管道排序可能是一个额外的步骤。这是过程;

  1. 制作管道以捕获所有status = 0/1设备的所有内容,让我们为其命名grabber.conf
  2. grabber.conf会做 grok,并输出到status.log磁盘上。这status.log将包含所有开发人员的所有状态数据。
  3. grabber.conf会有input { exec { command => "sort...." } }。这sort将被定向到status.log按顺序对所有日期进行排序并保存自己。(的间隔exec是微不足道的)
  4. 通过将标签添加到它们作为开始和结束,创建另一个管道以对来自开发人员的所有状态执行经过的过滤器。让我们为它命名durations.conf
  5. 因此,无论收集和保存日志的日期顺序是什么status.log,该sort命令都会按顺序排列它们,并准备好进行durations.conf经过的计算。

通过fingerprint添加durations.conf.

让我知道这是否可以替代我的查询。

于 2021-06-10T22:13:59.130 回答
0
input { 
    file    {
        path => "/home/dev*/status.log*"
        exclude => "status.log.10"
        start_position => "beginning"
        sincedb_path => "/dev/null"
#       sincedb_path => "/home/dev/db/devdb"
        file_sort_by => "path"
        file_sort_direction => "desc"
        }
}



output
{
stdout { codec => rubydebug }
}

filter {


if [path] =~ "dev1" 
{
mutate
{
replace => { "host" => "dev1" }
}
}
else if [path] =~ "dev2" 
{
mutate
{
replace => { "host" => "dev2" }
}
}
else if [path] =~ "dev3" 
{
mutate
{
replace => { "host" => "dev3" }
}
}
else if [path] =~ "dev4" 
{
mutate
{
replace => { "host" => "dev4" }
}
}


if [message] =~ "devManager"
{
grok
{
match => { "message" => "(?<logtime>%{DAY} %{MONTH} %{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}).*= %{BASE10NUM:status}" }
                    
}
date
{
match =>  [ "logtime", "EEE MMM dd HH:mm:ss.SSS" ] 
}

    if [status] == "0" {
    mutate
    {
      update => { "status" => "down" }
    }
    }
    else if [status] == "1" {
    mutate
    {
      update => { "status" => "up" }
    }
    }

mutate
{
add_tag => [ "%{status}" ]
}

elapsed
{
start_tag => "up"
end_tag => "down"
unique_id_field => "host"
timeout => 86400
}

elapsed
{
start_tag => "down"
end_tag => "up"
unique_id_field => "host"
timeout => 86400
}

if "up" in [tags] and [host]
{
mutate
{
add_field => { "host_down" => "%{elapsed_time}" }
}
mutate
{
convert =>
{
"host_down" => "float"
}
}
}



else if "down" in [tags] and [host]
{
mutate
{
add_field => { "host_up" => "%{elapsed_time}" }
}
mutate
{
convert =>
{
"host_up" => "float"
}
}
}


mutate
{
rename => { 
"status" => "%{host}_status" 
"host_up" => "%{host}_up"
"host_down" => "%{host}_down"
}
remove_field => [ "info" , "@version"    ]
}
}        
     
else { drop { } }

这是我与一名工人一起使用的 conf 文件。路径 - “dev*” 有 dev1 到 dev12 文件夹,要从中读取。

日志样本如下;

/dev/status.log
Wed Jun 09 22:26:37.296  devManager: status = 1
Wed Jun 09 23:09:40.191  devManager: status = 0
Wed Jun 09 23:10:17.064  devManager: status = 0
Wed Jun 09 23:11:14.692  devManager: status = 1

@leandrojmp

于 2021-06-10T17:31:16.787 回答
0

聚合过滤器需要pipeline.workers设置为1才能正常工作,虽然经过的过滤器没有说明文档中的工作人员数量,但也建议将 设置pipeline.workers1

这两个过滤器都需要事件通过同一线程上的管道,因此您需要pipeline.workers设置为1.

于 2021-06-10T12:47:15.490 回答