2

我有 Filebeat、Logstash、ElasticSearch 和 Kibana。Filebeat 位于单独的服务器上,它应该接收不同格式的数据:syslog、json、来自数据库等,并将其发送到 Logstash。

我知道如何设置 Logstash 以使其处理单一格式,但由于有多种数据格式,我将如何配置 Logstash 以正确处理每种数据格式?

事实上,我该如何设置它们,Logstash 和 Filebeat,以便所有不同格式的数据都从 Filebeat 发送并正确提交到 Logstash?我的意思是,处理发送和接收数据的配置设置。

4

3 回答 3

4

要在 Logstash 管道中区分不同类型的输入,请使用该type字段并tags进行更多识别。

在您的 Filebeat 配置中,您应该为每种不同的数据格式使用不同的prospector,然后可以将每个prospector 设置为具有不同的document_type:字段。

参考

例如:

filebeat:
  # List of prospectors to fetch data.
  prospectors:
    # Each - is a prospector. Below are the prospector specific configurations
    -
      # Paths that should be crawled and fetched. Glob based paths.
      # For each file found under this path, a harvester is started.
      paths:
        - "/var/log/apache/httpd-*.log"
      # Type to be published in the 'type' field. For Elasticsearch output,
      # the type defines the document type these entries should be stored
      # in. Default: log
      document_type: apache
    -
      paths:
        - /var/log/messages
        - "/var/log/*.log"
      document_type: log_message

在上面的示例中,来自的日志/var/log/apache/httpd-*.log将具有document_type: apache,而其他探矿者具有document_type: log_message

document-type字段成为typeLogstash 处理事件时的字段。然后,您可以使用ifLogstash 中的语句对不同的类型进行不同的处理。

参考

例如:

filter {
  if [type] == "apache" {
    # apache specific processing
  }
  else if [type] == "log_message" {
    # log_message processing
  }
}
于 2016-06-07T16:33:15.093 回答
1

如果您问题中的“数据格式”是编解码器,则必须在 logstash 的输入中进行配置。以下是关于 filebeat 1.x 和 logstash 2.x,而不是 elastic 5 堆栈。在我们的设置中,我们有两个节拍输入 - 第一个是 default = "plain":

beats {
    port                => 5043
}
beats {
    port                => 5044
    codec               => "json"
}

在 filebeat 方面,我们需要两个 filebeat 实例,将它们的输出发送到各自的端​​口。不可能告诉 filebeat“将这个探矿者路由到那个输出”。

文档logstash:https ://www.elastic.co/guide/en/logstash/2.4/plugins-inputs-beats.html

备注:如果您使用不同的协议,例如旧的 logstash-forwarder / lumberjack,您需要更多端口。

于 2016-11-27T11:03:19.500 回答
1

支持 7.5.1

filebeat-multifile.yml // 机器上安装的文件beat

filebeat.inputs:
- type: log
  tags: ["gunicorn"]
  paths:
    - "/home/hduser/Data/gunicorn-100.log"

- type: log
  tags: ["apache"]
  paths:
    - "/home/hduser/Data/apache-access-100.log"

output.logstash:
  hosts: ["0.0.0.0:5044"] // target logstash IP

gunicorn-apache-log.conf // 安装在另一台机器上的日志存储

input {
  beats {
    port => "5044"
    host => "0.0.0.0" 
  }
}

filter {
    if "gunicorn" in [tags] {
        grok {
            match => { "message" => "%{USERNAME:u1} %{USERNAME:u2} \[%{HTTPDATE:http_date}\] \"%{DATA:http_verb} %{URIPATHPARAM:api} %{DATA:http_version}\" %{NUMBER:status_code} %{NUMBER:byte} \"%{DATA:external_api}\" \"%{GREEDYDATA:android_client}\"" }
            remove_field => "message"
        }
    }
    else if "apache" in [tags] {
        grok {
            match => { "message" => "%{IPORHOST:client_ip} %{DATA:u1} %{DATA:u2} \[%{HTTPDATE:http_date}\] \"%{WORD:http_method} %{URIPATHPARAM:api} %{DATA:http_version}\" %{NUMBER:status_code} %{NUMBER:byte} \"%{DATA:external_api}\" \"%{GREEDYDATA:gd}\" \"%{DATA:u3}\""}
            remove_field => "message"
        }

    }
}

output {
    if "gunicorn" in [tags]{
        stdout { codec => rubydebug }

        elasticsearch {
        hosts => [...]
        index => "gunicorn-index"
        }

    }
    else if "apache" in [tags]{
        stdout { codec => rubydebug }

        elasticsearch {
             hosts => [...]
             index => "apache-index"
        }
    }
}

从二进制文件运行 filebeat 给文件适当的权限

sudo chown root:root filebeat-multifile.yml
sudo chmod go-w filebeat-multifile.yml
sudo ./filebeat -e -c filebeat-multifile-1.yml -d "publish"

从二进制文件运行logstash

./bin/logstash -f gunicorn-apache-log.conf
于 2020-01-07T10:41:33.727 回答