1

我最近开始使用 Logstash 从默认的 UDP 514 端口捕获系统日志数据。该程序正确运行并为我提供了我正在寻找的数据,但是,在运行时查看 netstat 时,它会打开服务器上的数千个端口并将它们留在 TIME_WAIT 状态。这很奇怪。我在网上查过,但没有找到对此的解释。

另一个有趣的地方是,所有这些端口似乎都是由 Windows 内核 PID 0 打开的,但实际上它们确实对应于 Logstash 进程,因为它们只在运行时出现,并在关闭 Logstash 后几分钟消失。

所以我的问题是,为什么会发生这种情况,如何防止 Logstash 打开所有这些端口?

我在 Logstash 运行时包含了 Netstat 输出的屏幕截图,以及下面的配置文件。

在此处输入图像描述

  # LogStash configuration file, to read syslog messages from a port and write them to CSV files.
input {
    # Read from standard UDP syslog port.
    udp {
        port => 514
        }
}
filter {
    # Look for records having specific formats.
    grok {
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:direction} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} \(%{IP:ip_from_2}/%{NUMBER:port_from_2}\)\(%{WORD:domain_from}\\%{NOTSPACE:user_from}\) to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1} \(%{IP:ip_to_2}/%{NUMBER:port_to_2}\) \(%{NOTSPACE:user_to}\)" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:direction} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} \(%{IP:ip_from_2}/%{NUMBER:port_from_2}\) to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1} \(%{IP:ip_to_2}/%{NUMBER:port_to_2}\)" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:direction} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1}" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1}\(%{NOTSPACE:login_from}\) to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1} duration %{NOTSPACE:duration} bytes %{NUMBER:bytes}( \(%{NOTSPACE:login_to}\))?" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1} duration %{NOTSPACE:duration} bytes %{NUMBER:bytes}" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action} dynamic %{WORD:protocol} %{WORD:object} from %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1}( duration %{NOTSPACE:duration})?" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:protocol} src %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} dst %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1}( by access-group \"%{WORD:access_group}\")?" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action}( %{WORD:direction})? %{WORD:protocol} %{WORD:object} for faddr %{IP:ip_from_1}/%{NUMBER:port_from_1} gaddr %{IP:ip_to_1}/%{NUMBER:port_to_1} laddr %{IP:ip_to_2}/%{NUMBER:port_to_2}" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:protocol} \(no connection\) from %{IP:ip_from_1}/%{NUMBER:port_from_1} to %{IP:ip_to_1}/%{NUMBER:port_to_1}" }
        match => { "message" => "%{SYSLOGPROG}: %{NOTSPACE} %{NOTSPACE:access_list_name} %{WORD:action} %{WORD:protocol} %{NOTSPACE:label_from}/%{IP:ip_from_1}\(%{NUMBER:port_from_1}\) -> %{NOTSPACE:label_to}/%{IP:ip_to_1}\(%{NUMBER:port_to_1}\) %{GREEDYDATA} \[%{GREEDYDATA}\]" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:action} local-host %{NOTSPACE:label_from}:%{IP:ip_from_1}( duration %{NOTSPACE:duration})?" }
        match => { "message" => "%{SYSLOGPROG}: %{WORD:protocol} %{WORD:object} %{WORD:action} from %{IP:ip_from_1}(/%{NUMBER:port_from_1})? to %{NOTSPACE:label_to}:%{IP:ip_to_1}(/%{NUMBER:port_to_1})?" }

        add_tag => [ "matched" ]
    }
    # If message doesn't match one of the fully-specified formats, try matching against an unparsed syslog format.
    if "matched" not in [tags] {
        grok {
            match => { "message" => "%{SYSLOGPROG}: %{GREEDYDATA:message_text}" }
            add_tag => [ "matched" ]
        }
    }
    # If message still doesn't match, just grab the whole thing.
    if "matched" not in [tags] {
        grok {
            match => { "message" => "%{GREEDYDATA:message_text}" }
        }
    }
    # Try deriving geographic coordinates from the IP addresses, using the LogStash built-in GeoIP database.
    geoip {
        source => "ip_from_1"
        target => "geoip_from_1"
#       add_field => [ "[geoip_from_1][latitude]", "%{[geoip_from_1][latitude]}"  ]
#       add_field => [ "[geoip_from_1][longitude]", "%{[geoip_from_1][longitude]}" ]
    }
    geoip {
        source => "ip_from_2"
        target => "geoip_from_2"
#       add_field => [ "[geoip_from_2][latitude]", "%{[geoip_from_2][latitude]}"  ]
#       add_field => [ "[geoip_from_2][longitude]", "%{[geoip_from_2][longitude]}" ]
    }
    geoip {
        source => "ip_to_1"
        target => "geoip_to_1"
#       add_field => [ "[geoip_to_1][latitude]", "%{[geoip_to_1][latitude]}"  ]
#       add_field => [ "[geoip_to_1][longitude]", "%{[geoip_to_1][longitude]}" ]
    }
    geoip {
        source => "ip_to_2"
        target => "geoip_to_2"
#       add_field => [ "[geoip_to_2][latitude]", "%{[geoip_to_2][latitude]}"  ]
#       add_field => [ "[geoip_to_2][longitude]", "%{[geoip_to_2][longitude]}" ]
    }
    # Convert geographic coordinates from text to numeric.
#   mutate {
#       convert => [ "[geoip_from_1][latitude]", "float"]
#       convert => [ "[geoip_from_1][longitude]", "float"]
#       convert => [ "[geoip_from_2][latitude]", "float"]
#       convert => [ "[geoip_from_2][longitude]", "float"]
#       convert => [ "[geoip_to_1][latitude]", "float"]
#       convert => [ "[geoip_to_1][longitude]", "float"]
#       convert => [ "[geoip_to_1][latitude]", "float"]
#       convert => [ "[geoip_to_2][longitude]", "float"]
#   }
    # Replace the LogStash event timestamp with the value from the message.
    date {
        match => ["timestamp", "ISO8601"]
    }
}
output {
    # Write parsed fields to a CSV file, using a new file every minute (so that files can be loaded quickly into the data warehouse).
    csv {
        fields => ["@timestamp", "host", "program", "action", "direction", "protocol", "object", "label_from", "ip_from_1", "port_from_1", "user_from", "[geoip_from_1][country_code3]", "[geoip_from_1][city_name]", "[geoip_from_1][postal_code]", "[geoip_from_1][latitude]", "[geoip_from_1][longitude]", "ip_from_2", "port_from_2", "[geoip_from_2][country_code3]", "[geoip_from_2][city_name]", "[geoip_from_2][postal_code]", "[geoip_from_2][latitude]", "[geoip_from_2][longitude]", "label_to", "ip_to_1", "port_to_1", "user_to", "[geoip_to_1][country_code3]", "[geoip_to_1][city_name]", "[geoip_to_1][postal_code]", "[geoip_to_1][latitude]", "[geoip_to_1][longitude]", "ip_to_2", "port_to_2", "[geoip_to_2][country_code3]", "[geoip_to_2][city_name]", "[geoip_to_2][postal_code]", "[geoip_to_2][latitude]", "[geoip_to_2][longitude]", "duration", "bytes", "access_group", "message_text"]
        path => "D:\ETLFiles\IT\In\syslog_%{+YYYYMMddHHmm}.csv"
    }
}
4

0 回答 0