6

我对logstash和弹性搜索很陌生。我正在尝试将日志文件存储在弹性搜索和平面文件中。我知道logstash 支持两种输出。但它们是同时处理的吗?还是通过工作定期完成?

4

3 回答 3

6

是的,您可以通过使用托运人配置上的“add_tag”命令标记和克隆您的输入来做到这一点。

input
{
    tcp     { type => "linux" port => "50000" codec => plain { charset => "US-ASCII" } }
    tcp     { type => "apache_access" port => "50001" codec => plain { charset => "US-ASCII" } }
    tcp     { type => "apache_error"  port => "50002" codec => plain { charset => "US-ASCII" } }
    tcp     { type => "windows_security" port => "50003" codec => plain { charset => "US-ASCII" } }
    tcp     { type => "windows_application" port => "50004" codec => plain { charset => "US-ASCII" } }
    tcp     { type => "windows_system" port => "50005" codec => plain { charset => "US-ASCII" } }
udp { type => "network_equipment" port => "514" codec => plain { charset => "US-ASCII" } }
udp { type => "firewalls" port => "50006" codec => plain }
}
filter
{
    grok    { match => [ "host", "%{IPORHOST:ipaddr}(:%{NUMBER})?" ] }
    mutate  { replace => [ "fqdn", "%{ipaddr}" ] }
    dns     { reverse => [ "fqdn", "fqdn" ] action => "replace" }
    if [type] == "linux"                    { clone { clones => "linux.log" add_tag => "savetofile" } }
    if [type] == "apache_access"            { clone { clones => "apache_access.log" add_tag => "savetofile" } }
    if [type] == "apache_error"             { clone { clones => "apache_error.log" add_tag => "savetofile" } }
    if [type] == "windows_security"         { clone { clones => "windows_security.log" add_tag => "savetofile" } }
    if [type] == "windows_application"      { clone { clones => "windows_application.log" add_tag => "savetofile" } }
    if [type] == "windows_system"           { clone { clones => "windows_system.log" add_tag => "savetofile" } }
    if [type] == "network_equipment"        { clone { clones => "network_%{fqdn}.log" add_tag => "savetofile" } }
if [type] == "firewalls"        { clone { clones => "firewalls.log" add_tag => "savetofile" } }
}
output
{
    #stdout { debug => true }
    #stdout { codec => rubydebug }
    redis   { host => "1.1.1.1" data_type => "list" key => "logstash" }
}

在您的主要 logstash 实例上,您将执行以下操作:

input {
    redis {
    host => "1.1.1.1" 
    data_type => "list" 
    key => "logstash" 
    type=> "redis-input"
    # We use the 'json' codec here because we expect to read json events from redis.
    codec => json
          }
    }
    output
    {
        if "savetofile" in [tags] {
            file {
                path => [ "/logs/%{fqdn}/%{type}" ] message_format => "%{message}"   
            } 
        }
        else { elasticsearch { host => "2.2.2.2" }
    }
}
于 2014-01-02T19:24:26.097 回答
0

仅供参考,您可以研究有关 logstash 事件的 logstash 事件的生命周期

输出工作者模型目前是单线程。输出将按照配置文件中定义的顺序接收事件。

但输出可能会决定在发布事件之前暂时缓冲事件。例如:输出将缓冲 2 或 3 个事件,然后将其写入文件。

于 2014-01-03T06:56:50.397 回答
0

首先你需要安装输出插件:

/usr/share/logstash/bin/logstash-plugin install logstash-output-elasticsearch
/usr/share/logstash/bin/logstash-plugin install logstash-output-file

然后为输出创建conf文件:

cat /etc/logstash/conf.d/nfs-output.conf

output {
  file {
    path => "/your/path/filebeat-%{+YYYY-MM-dd}.log"
  }
}


cat /etc/logstash/conf.d/30-elasticsearch-output.conf

output {
  elasticsearch {
    hosts => ["elasitc_ip:9200"]
    manage_template => true
    user => "elastic"
    password => "your_password"
  }
}

然后:

service logstash restart
于 2018-03-27T16:37:11.540 回答