0

我正在尝试设置一个 logstash 工作程序,它从一个 amqp/rabbitmq 队列中获取所有消息,过滤一些消息以发送到 statsD,但也将所有消息发送到弹性搜索。以下实现仅不向 ElasticSearch 发送任何消息。

input {
  rabbitmq {
    host => "amqp-host"
    queue => "elasticsearch"
    key => "elasticsearch"
    exchange => "elasticsearch"
    type => "all"
    durable => true
    auto_delete => false
    exclusive => false
    format => "json_event"
    debug => false
  }
}

filter {
    grep {
      add_tag => "grepped"
      match => ["@message", "Execution of .*? took .* sec"]
    }

    grok {
        tags => ["grepped"]
        add_tag => "grokked"
        pattern => "Execution of %{DATA:command_name} took %{DATA:response_time} sec"
    }

    mutate {
        tags => ["grepped", "grokked"]
        lowercase => [ "command_name" ]
        add_tag => ["mutated"]
    }
}

output {
  elasticsearch_river {
    type => "all"
    rabbitmq_host => "amqp-host"
    debug => false
    durable => true
    persistent => true
    es_host => "es-host"
    exchange => "logstash-elasticsearch"
    exchange_type => "direct"
    index => "logs-%{+YYYY.MM.dd}"
    index_type => "%{@type}"
    queue => "logstash-elasticsearch" 
  }

 statsd {
    type => "command-filter"
    tags => ["grepped", "grokked", "mutated"]
    host => "some.domain.local"
    port => 1234
    sender => ""
    namespace => ""
    timing => ["prefix.%{command_name}.suffix", "%{response_time}"]
    increment => ["prefix.%{command_name}.suffix"]
  }
}

有什么包罗万象的过滤器吗?或者一种安排标签的方法,以便过滤一些消息但全部转发到 ES?

4

3 回答 3

1

克隆过滤器派上用场了。以下是我生成的配置文件。

input {
  rabbitmq {
    host => "amqp-host"
    queue => "elasticsearch"
    key => "elasticsearch"
    exchange => "elasticsearch"
    type => "all"
    durable => true
    auto_delete => false
    exclusive => false
    format => "json_event"
    debug => false
  }
}

filter {
    clone {
        exclude_tags => ["cloned"]
        clones => ["statsd", "elastic-search"]
        add_tag => ["cloned"]
    }

    grep {
      type => "statsd"
      add_tag => "grepped"
      match => ["@message", "Execution of .*Command took .* sec"]
    }

    grok {
        type => "statsd"
        tags => ["grepped"]
        add_tag => "grokked"
        pattern => "Execution of %{DATA:command_name}Command took %{DATA:response_time} sec"
    }

    mutate {
        type => "statsd"
        tags => ["grepped", "grokked"]
        lowercase => [ "command_name" ]
        add_tag => ["mutated"]
    }
}

output {
  elasticsearch_river {
    type => "all"
    rabbitmq_host => "amqp-host"
    debug => false
    durable => true
    persistent => true
    es_host => "es-host"
    exchange => "logstash-elasticsearch"
    exchange_type => "direct"
    index => "logs-%{+YYYY.MM.dd}"
    index_type => "%{@type}"
    queue => "logstash-elasticsearch" 
  }

  statsd {
    type => "statsd"
    tags => ["grepped", "grokked", "mutated"]
    host => "some.host.local"
    port => 1234
    sender => ""
    namespace => ""
    timing => ["commands.%{command_name}.responsetime", "%{response_time}"]
    increment => ["commands.%{command_name}.requests"]
  }
}
于 2013-06-27T17:49:00.857 回答
0

在您的情况下,clone过滤器实际上是不必要的。

有几件事我会推荐:

  • 请简化您的配置,直到您的“基础”工作。暂时不要设置可选参数
  • 确保type匹配
  • 使用标签,它们是救生员
  • 在“grepping”时,您应该保持一致并始终使用grok或始终使用grep. 我的偏好是grok

其他一些提示... 根据邮件设置的标签选择您要对邮件执行的操作。这是我的一个示例配置片段:

grok{
   type => "company"
   pattern => ["((status:new))"]
   add_tag => ["company-vprod-status-new", "company-vprod-status-new-%{company}", "company-vprod-status-new-%{candidate_username}"]
   tags=> "company-vprod-status-change"
}
output {
 elasticsearch {
    host => "127.0.0.1"
    type => "company"
    index => "logstash-syslog-%{+YYYY.MM.dd}"
 }
 statsd {
    host => "graphite.test.company.com"
    increment => ["vprod.statuses.new.all", "vprod.statuses.new.%{company}.all"]
    tags => ["company-vprod-status-new"]   
 }
}

另外,要密切注意你的类型。如果您将type属性设置为不存在的值,则该块将永远不会被触发。这就是为什么我更喜欢使用标签的原因,除非有明确的理由使用类型。

于 2013-06-28T10:37:02.460 回答
0

您还可以添加:

drop => false

在 grep 节的末尾(如果您仍在使用 grep)

于 2013-11-05T20:10:26.957 回答