1

我已经在 centos 7 上设置了一个 ELK 堆栈,并且正在从运行 bro 的 freebsd 11 主机转发日志。但是,我的过滤器无法正确解析 bro 日志。

这是当前的设置:

freebsd filebeat 客户端

文件节拍.yml

filebeat:
registry_file: /var/run/.filebeat
prospectors:
-
  paths:
    - /var/log/messages
    - /var/log/maillog
    - /var/log/auth.log
    - /var/log/cron
    - /var/log/debug.log
    - /var/log/devd.log
    - /var/log/ppp.log
    - /var/log/netatalk.log
    - /var/log/setuid.today
    - /var/log/utx.log
    - /var/log/rkhunter.log
    - /var/log/userlog
    - /var/log/sendmail.st
    - /var/log/xferlog
  input_type: log
  document_type: syslog

-
  paths:
    - /var/log/bro/current/app_stats.log
  input_type: log
  document_type: bro_app_stats

-
  paths:
    - /var/log/bro/current/communication.log
  input_type: log
  document_type: bro_communication

-
  paths:
    - /var/log/bro/current/conn.log
  input_type: log
  document_type: bro_conn

-
  paths:
    - /var/log/bro/current/dhcp.log
  input_type: log
  document_type: bro_dhcp

-
  paths:
    - /var/log/bro/current/dns.log
  input_type: log
  document_type: bro_dns

-
  paths:
    - /var/log/bro/current/dpd.log
  input_type: log
  document_type: bro_dpd

-
  paths:
    - /var/log/bro/current/files.log
  input_type: log
  document_type: bro_files

-
  paths:
    - /var/log/bro/current/ftp.log
  input_type: log
  document_type: bro_ftp

-
  paths:
    - /var/log/bro/current/http.log
  input_type: log
  document_type: bro_http

-
  paths:
    - /var/log/bro/current/known_certs.log
  input_type: log
  document_type: bro_app_known_certs

-
  paths:
    - /var/log/bro/current/known_hosts.log
  input_type: log
  document_type: bro_known_hosts

-
  paths:
    - /var/log/bro/current/known_services.log
  input_type: log
  document_type: bro_known_services

-
  paths:
    - /var/log/bro/current/notice.log
  input_type: log
  document_type: bro_notice

-
  paths:
    - /var/log/bro/current/smtp.log
  input_type: log
  document_type: bro_smtp

-
  paths:
    - /var/log/bro/current/software.log
  input_type: log
  document_type: bro_software

-
  paths:
    - /var/log/bro/current/ssh.log
  input_type: log
  document_type: bro_ssh

-
  paths:
    - /var/log/bro/current/ssl.log
  input_type: log
  document_type: bro_ssl

-
  paths:
    - /var/log/bro/current/weird.log
  input_type: log
  document_type: bro_weird

-
  paths:
    - /var/log/bro/current/x509.log
  input_type: log
  document_type: bro_x509

然后在 centos ELK 服务器上我有 4 个配置:

/etc/logstash/conf.d/02-beats-input.conf

input {
  beats {
    port => 5044
  }
}

/etc/logstash/conf.d/10-syslog-filter.conf

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
  add_field => [ "received_at", "%{@timestamp}" ]
  add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

/etc/logstash/conf.d/20-bro-ids-filter.conf

filter {
    # bro_app_stats ######################
      if [type] == "bro_app" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<ts_delta>(.*?))\t(?<app>(.*?))\t(?<uniq_hosts>(.*?))\t(?<hits>(.*?))\t(?<bytes>(.*))" ]
        }
      }

    # bro_conn ######################
      if [type] == "bro_conn" {
        grok {
            match => [ 
                "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<proto>(.*?))\t(?<service>(.*?))\t(?<duration>(.*?))\t(?<orig_bytes>(.*?))\t(?<resp_bytes>(.*?))\t(?<conn_state>(.*?))\t(?<local_orig>(.*?))\t(?<missed_bytes>(.*?))\t(?<history>(.*?))\t(?<orig_pkts>(.*?))\t(?<orig_ip_bytes>(.*?))\t(?<resp_pkts>(.*?))\t(?<resp_ip_bytes>(.*?))\t(?<tunnel_parents>(.*?))\t(?<orig_cc>(.*?))\t(?<resp_cc>(.*?))\t(?<sensorname>(.*))",
                "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<proto>(.*?))\t(?<service>(.*?))\t(?<duration>(.*?))\t(?<orig_bytes>(.*?))\t(?<resp_bytes>(.*?))\t(?<conn_state>(.*?))\t(?<local_orig>(.*?))\t(?<missed_bytes>(.*?))\t(?<history>(.*?))\t(?<orig_pkts>(.*?))\t(?<orig_ip_bytes>(.*?))\t(?<resp_pkts>(.*?))\t(?<resp_ip_bytes>(.*?))\t(%{NOTSPACE:tunnel_parents})"
            ]
        }
      }

    # bro_notice ######################
      if [type] == "bro_notice" {
        grok { 
          match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<fuid>(.*?))\t(?<file_mime_type>(.*?))\t(?<file_desc>(.*?))\t(?<proto>(.*?))\t(?<note>(.*?))\t(?<msg>(.*?))\t(?<sub>(.*?))\t(?<src>(.*?))\t(?<dst>(.*?))\t(?<p>(.*?))\t(?<n>(.*?))\t(?<peer_descr>(.*?))\t(?<actions>(.*?))\t(?<suppress_for>(.*?))\t(?<dropped>(.*?))\t(?<remote_location.country_code>(.*?))\t(?<remote_location.region>(.*?))\t(?<remote_location.city>(.*?))\t(?<remote_location.latitude>(.*?))\t(?<remote_location.longitude>(.*))" ]
        }
      }


    # bro_dhcp ######################
      if [type] == "bro_dhcp" {
        grok { 
          match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<mac>(.*?))\t(?<assigned_ip>(.*?))\t(?<lease_time>(.*?))\t(?<trans_id>(.*))" ]
        }
      }

    # bro_dns ######################
      if [type] == "bro_dns" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<proto>(.*?))\t(?<trans_id>(.*?))\t(?<query>(.*?))\t(?<qclass>(.*?))\t(?<qclass_name>(.*?))\t(?<qtype>(.*?))\t(?<qtype_name>(.*?))\t(?<rcode>(.*?))\t(?<rcode_name>(.*?))\t(?<AA>(.*?))\t(?<TC>(.*?))\t(?<RD>(.*?))\t(?<RA>(.*?))\t(?<Z>(.*?))\t(?<answers>(.*?))\t(?<TTLs>(.*?))\t(?<rejected>(.*))" ]
        }
      }

    # bro_software ######################
      if [type] == "bro_software" {
        grok { 
          match => [ "message", "(?<ts>(.*?))\t(?<bro_host>(.*?))\t(?<host_p>(.*?))\t(?<software_type>(.*?))\t(?<name>(.*?))\t(?<version.major>(.*?))\t(?<version.minor>(.*?))\t(?<version.minor2>(.*?))\t(?<version.minor3>(.*?))\t(?<version.addl>(.*?))\t(?<unparsed_version>(.*))" ]
        }
      }

    # bro_dpd ######################
      if [type] == "bro_dpd" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<proto>(.*?))\t(?<analyzer>(.*?))\t(?<failure_reason>(.*))" ]
        }
      }

    # bro_files ######################
      if [type] == "bro_files" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<fuid>(.*?))\t(?<tx_hosts>(.*?))\t(?<rx_hosts>(.*?))\t(?<conn_uids>(.*?))\t(?<source>(.*?))\t(?<depth>(.*?))\t(?<analyzers>(.*?))\t(?<mime_type>(.*?))\t(?<filename>(.*?))\t(?<duration>(.*?))\t(?<local_orig>(.*?))\t(?<is_orig>(.*?))\t(?<seen_bytes>(.*?))\t(?<total_bytes>(.*?))\t(?<missing_bytes>(.*?))\t(?<overflow_bytes>(.*?))\t(?<timedout>(.*?))\t(?<parent_fuid>(.*?))\t(?<md5>(.*?))\t(?<sha1>(.*?))\t(?<sha256>(.*?))\t(?<extracted>(.*))" ]
        }
      }

    # bro_http ######################
      if [type] == "bro_http" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<trans_depth>(.*?))\t(?<method>(.*?))\t(?<bro_host>(.*?))\t(?<uri>(.*?))\t(?<referrer>(.*?))\t(?<user_agent>(.*?))\t(?<request_body_len>(.*?))\t(?<response_body_len>(.*?))\t(?<status_code>(.*?))\t(?<status_msg>(.*?))\t(?<info_code>(.*?))\t(?<info_msg>(.*?))\t(?<filename>(.*?))\t(?<http_tags>(.*?))\t(?<username>(.*?))\t(?<password>(.*?))\t(?<proxied>(.*?))\t(?<orig_fuids>(.*?))\t(?<orig_mime_types>(.*?))\t(?<resp_fuids>(.*?))\t(?<resp_mime_types>(.*))" ]
        }
      }

    # bro_known_certs ######################
      if [type] == "bro_known_certs" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<bro_host>(.*?))\t(?<port_num>(.*?))\t(?<subject>(.*?))\t(?<issuer_subject>(.*?))\t(?<serial>(.*))" ]
        }
      }

    # bro_known_hosts ######################
      if [type] == "bro_known_hosts" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<bro_host>(.*))" ]
        }
      }

    # bro_known_services ######################
      if [type] == "bro_known_services" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<bro_host>(.*?))\t(?<port_num>(.*?))\t(?<port_proto>(.*?))\t(?<service>(.*))" ]
        }
      }

    # bro_ssh ######################
      if [type] == "bro_ssh" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<status>(.*?))\t(?<direction>(.*?))\t(?<client>(.*?))\t(?<server>(.*?))\t(?<remote_location.country_code>(.*?))\t(?<remote_location.region>(.*?))\t(?<remote_location.city>(.*?))\t(?<remote_location.latitude>(.*?))\t(?<remote_location.longitude>(.*))" ]
        }
      }

    # bro_ssl ######################
      if [type] == "bro_ssl" {
        grok {
          match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<version>(.*?))\t(?<cipher>(.*?))\t(?<server_name>(.*?))\t(?<session_id>(.*?))\t(?<subject>(.*?))\t(?<issuer_subject>(.*?))\t(?<not_valid_before>(.*?))\t(?<not_valid_after>(.*?))\t(?<last_alert>(.*?))\t(?<client_subject>(.*?))\t(?<client_issuer_subject>(.*?))\t(?<cert_hash>(.*?))\t(?<validation_status>(.*))" ]
        }
      }

    # bro_weird ######################
    if [type] == "bro_weird" {
        grok {
            match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<name>(.*?))\t(?<addl>(.*?))\t(?<notice>(.*?))\t(?<peer>(.*))" ]
            }
    }

    # bro_x509 #######################
    if [type] == "bro_x509" {
        csv {

          #x509.log:#fields ts  id  certificate.version certificate.serial  certificate.subject certificate.issuer  certificate.not_valid_before    certificate.not_valid_after certificate.key_alg certificate.sig_alg certificate.key_type    certificate.key_length  certificate.exponent    certificate.curve   san.dns san.uri san.email   san.ip  basic_constraints.ca    basic_constraints.path_len
          columns => ["ts","id","certificate.version","certificate.serial","certificate.subject","icertificate.issuer","certificate.not_valid_before","certificate.not_valid_after","certificate.key_alg","certificate.sig_alg","certificate.key_type","certificate.key_length","certificate.exponent","certificate.curve","san.dns","san.uri","san.email","san.ip","basic_constraints.ca","basic_constraints.path_len"]

          #If you use a custom delimiter, change the following value in between the quotes to your delimiter. Otherwise, leave the next line alone.
          separator => "    "
        }

        #Let's convert our timestamp into the 'ts' field, so we can use Kibana features natively
        date {
          match => [ "ts", "UNIX" ]
        }

      }

    if [type]== "bro_intel" {
      grok {
        match => [ "message", "(?<ts>(.*?))\t%{DATA:uid}\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t%{DATA:fuid}\t%{DATA:file_mime_type}\t%{DATA:file_desc}\t(?<seen.indicator>(.*?))\t(?<seen.indicator_type>(.*?))\t(?<seen.where>(.*?))\t%{NOTSPACE:sources}" ]
     }
   }
  }
  date {
    match => [ "ts", "UNIX" ]
  }
}

filter {
  if "bro" in [type] {
    if [id.orig_h] {
      mutate {
        add_field => [ "senderbase_lookup", "http://www.senderbase.org/lookup/?search_string=%{id.orig_h}" ]
        add_field => [ "CBL_lookup", "http://cbl.abuseat.org/lookup.cgi?ip=%{id.orig_h}" ]
        add_field => [ "Spamhaus_lookup", "http://www.spamhaus.org/query/bl?ip=%{id.orig_h}" ]
      }
    }
    mutate {
      add_tag => [ "bro" ]
    }
    mutate {
      convert => [ "id.orig_p", "integer" ]
      convert => [ "id.resp_p", "integer" ]
      convert => [ "orig_bytes", "integer" ]
      convert => [ "resp_bytes", "integer" ]
      convert => [ "missed_bytes", "integer" ]
      convert => [ "orig_pkts", "integer" ]
      convert => [ "orig_ip_bytes", "integer" ]
      convert => [ "resp_pkts", "integer" ]
      convert => [ "resp_ip_bytes", "integer" ]
    }
  }
}

filter {
  if [type] == "bro_conn" {
    #The following makes use of the translate filter (stash contrib) to convert conn_state into human text. Saves having to look up values for packet introspection
    translate {
      field => "conn_state"
      destination => "conn_state_full"
      dictionary => [ 
        "S0", "Connection attempt seen, no reply",
        "S1", "Connection established, not terminated",
        "S2", "Connection established and close attempt by originator seen (but no reply from responder)",
        "S3", "Connection established and close attempt by responder seen (but no reply from originator)",
        "SF", "Normal SYN/FIN completion",
        "REJ", "Connection attempt rejected",
        "RSTO", "Connection established, originator aborted (sent a RST)",
        "RSTR", "Established, responder aborted",
        "RSTOS0", "Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder",
        "RSTRH", "Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator",
        "SH", "Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)",
        "SHR", "Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator",
        "OTH", "No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)" 
      ]
    }
  }
}
# Resolve @source_host to FQDN if possible if missing for some types of ging using source_host_ip from above
filter {
  if [id.orig_h] {
    if ![id.orig_h-resolved] {
      mutate {
        add_field => [ "id.orig_h-resolved", "%{id.orig_h}" ]
      }
      dns {
        reverse => [ "id.orig_h-resolved" ]
        action => "replace"
      }
    }
  }
}
filter {
  if [id.resp_h] {
    if ![id.resp_h-resolved] {
      mutate {
        add_field => [ "id.resp_h-resolved", "%{id.resp_h}" ]
      }
      dns {
        reverse => [ "id.resp_h-resolved" ]
        action => "replace"
      }
    }
  }
}

和 /etc/logstash/conf.d/30-elasticsearch-output.conf

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    manage_template => false
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
  }
}

我利用了这个要点并根据我的配置对其进行了定制。运行时,我在 /var/log/logstash/logstash-plain.log 中收到以下错误:

[2016-11-06T15:30:36,961][ERROR][logstash.agent           ] ########\n\t  if [type] == \"bro_dhcp\" {\n\t\tgrok { \n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<uid>(.*?))\\t(?<id.orig_h>(.*?))\\t(?<id.orig_p>(.*?))\\t(?<id.resp_h>(.*?))\\t(?<id.resp_p>(.*?))\\t(?<mac>(.*?))\\t(?<assigned_ip>(.*?))\\t(?<lease_time>(.*?))\\t(?<trans_id>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_dns ######################\n\t  if [type] == \"bro_dns\" {\n\t\tgrok {\n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<uid>(.*?))\\t(?<id.orig_h>(.*?))\\t(?<id.orig_p>(.*?))\\t(?<id.resp_h>(.*?))\\t(?<id.resp_p>(.*?))\\t(?<proto>(.*?))\\t(?<trans_id>(.*?))\\t(?<query>(.*?))\\t(?<qclass>(.*?))\\t(?<qclass_name>(.*?))\\t(?<qtype>(.*?))\\t(?<qtype_name>(.*?))\\t(?<rcode>(.*?))\\t(?<rcode_name>(.*?))\\t(?<AA>(.*?))\\t(?<TC>(.*?))\\t(?<RD>(.*?))\\t(?<RA>(.*?))\\t(?<Z>(.*?))\\t(?<answers>(.*?))\\t(?<TTLs>(.*?))\\t(?<rejected>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_software ######################\n\t  if [type] == \"bro_software\" {\n\t\tgrok { \n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<bro_host>(.*?))\\t(?<host_p>(.*?))\\t(?<software_type>(.*?))\\t(?<name>(.*?))\\t(?<version.major>(.*?))\\t(?<version.minor>(.*?))\\t(?<version.minor2>(.*?))\\t(?<version.minor3>(.*?))\\t(?<version.addl>(.*?))\\t(?<unparsed_version>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_dpd ######################\n\t  if [type] == \"bro_dpd\" {\n\t\tgrok {\n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<uid>(.*?))\\t(?<id.orig_h>(.*?))\\t(?<id.orig_p>(.*?))\\t(?<id.resp_h>(.*?))\\t(?<id.resp_p>(.*?))\\t(?<proto>(.*?))\\t(?<analyzer>(.*?))\\t(?<failure_reason>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_files ######################\n\t  if [type] == \"bro_files\" {\n\t\tgrok {\n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<fuid>(.*?))\\t(?<tx_hosts>(.*?))\\t(?<rx_hosts>(.*?))\\t(?<conn_uids>(.*?))\\t(?<source>(.*?))\\t(?<depth>(.*?))\\t(?<analyzers>(.*?))\\t(?<mime_type>(.*?))\\t(?<filename>(.*?))\\t(?<duration>(.*?))\\t(?<local_orig>(.*?))\\t(?<is_orig>(.*?))\\t(?<seen_bytes>(.*?))\\t(?<total_bytes>(.*?))\\t(?<missing_bytes>(.*?))\\t(?<overflow_bytes>(.*?))\\t(?<timedout>(.*?))\\t(?<parent_fuid>(.*?))\\t(?<md5>(.*?))\\t(?<sha1>(.*?))\\t(?<sha256>(.*?))\\t(?<extracted>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_http ######################\n\t  if [type] == \"bro_http\" {\n\t\tgrok {\n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<uid>(.*?))\\t(?<id.orig_h>(.*?))\\t(?<id.orig_p>(.*?))\\t(?<id.resp_h>(.*?))\\t(?<id.resp_p>(.*?))\\t(?<trans_depth>(.*?))\\t(?<method>(.*?))\\t(?<bro_host>(.*?))\\t(?<uri>(.*?))\\t(?<referrer>(.*?))\\t(?<user_agent>(.*?))\\t(?<request_body_len>(.*?))\\t(?<response_body_len>(.*?))\\t(?<status_code>(.*?))\\t(?<status_msg>(.*?))\\t(?<info_code>(.*?))\\t(?<info_msg>(.*?))\\t(?<filename>(.*?))\\t(?<http_tags>(.*?))\\t(?<username>(.*?))\\t(?<password>(.*?))\\t(?<proxied>(.*?))\\t(?<orig_fuids>(.*?))\\t(?<orig_mime_types>(.*?))\\t(?<resp_fuids>(.*?))\\t(?<resp_mime_types>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_known_certs ######################\n\t  if [type] == \"bro_known_certs\" {\n\t\tgrok {\n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<bro_host>(.*?))\\t(?<port_num>(.*?))\\t(?<subject>(.*?))\\t(?<issuer_subject>(.*?))\\t(?<serial>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_known_hosts ######################\n\t  if [type] == \"bro_known_hosts\" {\n\t\tgrok {\n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<bro_host>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_known_services ######################\n\t  if [type] == \"bro_known_services\" {\n\t\tgrok {\n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<bro_host>(.*?))\\t(?<port_num>(.*?))\\t(?<port_proto>(.*?))\\t(?<service>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_ssh ######################\n\t  if [type] == \"bro_ssh\" {\n\t\tgrok {\n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<uid>(.*?))\\t(?<id.orig_h>(.*?))\\t(?<id.orig_p>(.*?))\\t(?<id.resp_h>(.*?))\\t(?<id.resp_p>(.*?))\\t(?<status>(.*?))\\t(?<direction>(.*?))\\t(?<client>(.*?))\\t(?<server>(.*?))\\t(?<remote_location.country_code>(.*?))\\t(?<remote_location.region>(.*?))\\t(?<remote_location.city>(.*?))\\t(?<remote_location.latitude>(.*?))\\t(?<remote_location.longitude>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_ssl ######################\n\t  if [type] == \"bro_ssl\" {\n\t\tgrok {\n\t\t  match => [ \"message\", \"(?<ts>(.*?))\\t(?<uid>(.*?))\\t(?<id.orig_h>(.*?))\\t(?<id.orig_p>(.*?))\\t(?<id.resp_h>(.*?))\\t(?<id.resp_p>(.*?))\\t(?<version>(.*?))\\t(?<cipher>(.*?))\\t(?<server_name>(.*?))\\t(?<session_id>(.*?))\\t(?<subject>(.*?))\\t(?<issuer_subject>(.*?))\\t(?<not_valid_before>(.*?))\\t(?<not_valid_after>(.*?))\\t(?<last_alert>(.*?))\\t(?<client_subject>(.*?))\\t(?<client_issuer_subject>(.*?))\\t(?<cert_hash>(.*?))\\t(?<validation_status>(.*))\" ]\n\t\t}\n\t  }\n\n\t# bro_weird ######################\n\tif [type] == \"bro_weird\" {\n\t\tgrok {\n\t\t\tmatch => [ \"message\", \"(?<ts>(.*?))\\t(?<uid>(.*?))\\t(?<id.orig_h>(.*?))\\t(?<id.orig_p>(.*?))\\t(?<id.resp_h>(.*?))\\t(?<id.resp_p>(.*?))\\t(?<name>(.*?))\\t(?<addl>(.*?))\\t(?<notice>(.*?))\\t(?<peer>(.*))\" ]\n\t\t\t}\n\t}\n\t\n\t# bro_x509 #######################\n\tif [type] == \"bro_x509\" {\n\t\tcsv {\n\t\n\t\t  #x509.log:#fields\tts\tid\tcertificate.version\tcertificate.serial\tcertificate.subject\tcertificate.issuer\tcertificate.not_valid_before\tcertificate.not_valid_after\tcertificate.key_alg\tcertificate.sig_alg\tcertificate.key_type\tcertificate.key_length\tcertificate.exponent\tcertificate.curve\tsan.dns\tsan.uri\tsan.email\tsan.ip\tbasic_constraints.ca\tbasic_constraints.path_len\n\t\t  columns => [\"ts\",\"id\",\"certificate.version\",\"certificate.serial\",\"certificate.subject\",\"icertificate.issuer\",\"certificate.not_valid_before\",\"certificate.not_valid_after\",\"certificate.key_alg\",\"certificate.sig_alg\",\"certificate.key_type\",\"certificate.key_length\",\"certificate.exponent\",\"certificate.curve\",\"san.dns\",\"san.uri\",\"san.email\",\"san.ip\",\"basic_constraints.ca\",\"basic_constraints.path_len\"]\n\t\n\t\t  #If you use a custom delimiter, change the following value in between the quotes to your delimiter. Otherwise, leave the next line alone.\n\t\t  separator => \"\t\"\n\t\t}\n\t\n\t\t#Let's convert our timestamp into the 'ts' field, so we can use Kibana features natively\n\t\tdate {\n\t\t  match => [ \"ts\", \"UNIX\" ]\n\t\t}\n\t\n\t  }\n\t\n\tif [type]== \"bro_intel\" {\n\t  grok {\n\t\tmatch => [ \"message\", \"(?<ts>(.*?))\\t%{DATA:uid}\\t(?<id.orig_h>(.*?))\\t(?<id.orig_p>(.*?))\\t(?<id.resp_h>(.*?))\\t(?<id.resp_p>(.*?))\\t%{DATA:fuid}\\t%{DATA:file_mime_type}\\t%{DATA:file_desc}\\t(?<seen.indicator>(.*?))\\t(?<seen.indicator_type>(.*?))\\t(?<seen.where>(.*?))\\t%{NOTSPACE:sources}\" ]\n\t }\n   }\n  }\n  date {\n\tmatch => [ \"ts\", \"UNIX\" ]\n  }\n}\n\nfilter {\n  if \"bro\" in [type] {\n\tif [id.orig_h] {\n\t  mutate {\n\t\tadd_field => [ \"senderbase_lookup\", \"http://www.senderbase.org/lookup/?search_string=%{id.orig_h}\" ]\n\t\tadd_field => [ \"CBL_lookup\", \"http://cbl.abuseat.org/lookup.cgi?ip=%{id.orig_h}\" ]\n\t\tadd_field => [ \"Spamhaus_lookup\", \"http://www.spamhaus.org/query/bl?ip=%{id.orig_h}\" ]\n\t  }\n\t}\n\tmutate {\n\t  add_tag => [ \"bro\" ]\n\t}\n\tmutate {\n\t  convert => [ \"id.orig_p\", \"integer\" ]\n\t  convert => [ \"id.resp_p\", \"integer\" ]\n\t  convert => [ \"orig_bytes\", \"integer\" ]\n\t  convert => [ \"resp_bytes\", \"integer\" ]\n\t  convert => [ \"missed_bytes\", \"integer\" ]\n\t  convert => [ \"orig_pkts\", \"integer\" ]\n\t  convert => [ \"orig_ip_bytes\", \"integer\" ]\n\t  convert => [ \"resp_pkts\", \"integer\" ]\n\t  convert => [ \"resp_ip_bytes\", \"integer\" ]\n\t}\n  }\n}\n\nfilter {\n  if [type] == \"bro_conn\" {\n\t#The following makes use of the translate filter (stash contrib) to convert conn_state into human text. Saves having to look up values for packet introspection\n\ttranslate {\n\t  field => \"conn_state\"\n\t  destination => \"conn_state_full\"\n\t  dictionary => [ \n\t\t\"S0\", \"Connection attempt seen, no reply\",\n\t\t\"S1\", \"Connection established, not terminated\",\n\t\t\"S2\", \"Connection established and close attempt by originator seen (but no reply from responder)\",\n\t\t\"S3\", \"Connection established and close attempt by responder seen (but no reply from originator)\",\n\t\t\"SF\", \"Normal SYN/FIN completion\",\n\t\t\"REJ\", \"Connection attempt rejected\",\n\t\t\"RSTO\", \"Connection established, originator aborted (sent a RST)\",\n\t\t\"RSTR\", \"Established, responder aborted\",\n\t\t\"RSTOS0\", \"Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder\",\n\t\t\"RSTRH\", \"Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator\",\n\t\t\"SH\", \"Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)\",\n\t\t\"SHR\", \"Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator\",\n\t\t\"OTH\", \"No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)\" \n\t  ]\n\t}\n  }\n}\n# Resolve @source_host to FQDN if possible if missing for some types of ging using source_host_ip from above\nfilter {\n  if [id.orig_h] {\n\tif ![id.orig_h-resolved] {\n\t  mutate {\n\t\tadd_field => [ \"id.orig_h-resolved\", \"%{id.orig_h}\" ]\n\t  }\n\t  dns {\n\t\treverse => [ \"id.orig_h-resolved\" ]\n\t\taction => \"replace\"\n\t  }\n\t}\n  }\n}\nfilter {\n  if [id.resp_h] {\n\tif ![id.resp_h-resolved] {\n\t  mutate {\n\t\tadd_field => [ \"id.resp_h-resolved\", \"%{id.resp_h}\" ]\n\t  }\n\t  dns {\n\t\treverse => [ \"id.resp_h-resolved\" ]\n\t\taction => \"replace\"\n\t  }\n\t}\n  }\n}\n\noutput {\n  elasticsearch {\n    hosts => [\"localhost:9200\"]\n    #sniffing => true\n    manage_template => false\n    index => \"%{[@metadata][beat]}-%{+YYYY.MM.dd}\"\n    document_type => \"%{[@metadata][type]}\"\n  }\n}\n\n", :reason=>"Expected one of #, input, filter, output at line 158, column 3 (byte 8746) after "}

我尽我所能查看了我的 logstash 配置,但看不到任何错误。谁能帮我弄清楚它有什么问题?

我正在运行 logstash.noarch 1:5.0.0-1 @elasticsearch elasticsearch.noarch 5.0.0-1 @elasticsearch

非常感谢

4

1 回答 1

1

如果您匹配 20-bro-ids-filter.conf 顶部的左花括号,您会看到它与您的 date{} 节之前的右花括号匹配。这会将 date{} 置于过滤器{} 之外,生成它期望输入{}、输出{} 或过滤器{} 的消息。

于 2016-11-07T05:49:40.523 回答