我最近开始使用 Logstash 从默认的 UDP 514 端口捕获系统日志数据。该程序正确运行并为我提供了我正在寻找的数据,但是,在运行时查看 netstat 时,它会打开服务器上的数千个端口并将它们留在 TIME_WAIT 状态。这很奇怪。我在网上查过,但没有找到对此的解释。
另一个有趣的地方是,所有这些端口似乎都是由 Windows 内核 PID 0 打开的,但实际上它们确实对应于 Logstash 进程,因为它们只在运行时出现,并在关闭 Logstash 后几分钟消失。
所以我的问题是,为什么会发生这种情况,如何防止 Logstash 打开所有这些端口?
我在 Logstash 运行时包含了 Netstat 输出的屏幕截图,以及下面的配置文件。
# LogStash configuration file, to read syslog messages from a port and write them to CSV files.
input {
# Read from standard UDP syslog port.
udp {
port => 514
}
}
filter {
# Look for records having specific formats.
grok {
match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:direction} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} \(%{IP:ip_from_2}/%{NUMBER:port_from_2}\)\(%{WORD:domain_from}\\%{NOTSPACE:user_from}\) to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1} \(%{IP:ip_to_2}/%{NUMBER:port_to_2}\) \(%{NOTSPACE:user_to}\)" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:direction} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} \(%{IP:ip_from_2}/%{NUMBER:port_from_2}\) to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1} \(%{IP:ip_to_2}/%{NUMBER:port_to_2}\)" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:direction} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1}" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1}\(%{NOTSPACE:login_from}\) to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1} duration %{NOTSPACE:duration} bytes %{NUMBER:bytes}( \(%{NOTSPACE:login_to}\))?" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:protocol} %{WORD:object} %{NUMBER} for %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1} duration %{NOTSPACE:duration} bytes %{NUMBER:bytes}" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:action} dynamic %{WORD:protocol} %{WORD:object} from %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} to %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1}( duration %{NOTSPACE:duration})?" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:protocol} src %{NOTSPACE:label_from}:%{IP:ip_from_1}/%{NUMBER:port_from_1} dst %{NOTSPACE:label_to}:%{IP:ip_to_1}/%{NUMBER:port_to_1}( by access-group \"%{WORD:access_group}\")?" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:action}( %{WORD:direction})? %{WORD:protocol} %{WORD:object} for faddr %{IP:ip_from_1}/%{NUMBER:port_from_1} gaddr %{IP:ip_to_1}/%{NUMBER:port_to_1} laddr %{IP:ip_to_2}/%{NUMBER:port_to_2}" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:action} %{WORD:protocol} \(no connection\) from %{IP:ip_from_1}/%{NUMBER:port_from_1} to %{IP:ip_to_1}/%{NUMBER:port_to_1}" }
match => { "message" => "%{SYSLOGPROG}: %{NOTSPACE} %{NOTSPACE:access_list_name} %{WORD:action} %{WORD:protocol} %{NOTSPACE:label_from}/%{IP:ip_from_1}\(%{NUMBER:port_from_1}\) -> %{NOTSPACE:label_to}/%{IP:ip_to_1}\(%{NUMBER:port_to_1}\) %{GREEDYDATA} \[%{GREEDYDATA}\]" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:action} local-host %{NOTSPACE:label_from}:%{IP:ip_from_1}( duration %{NOTSPACE:duration})?" }
match => { "message" => "%{SYSLOGPROG}: %{WORD:protocol} %{WORD:object} %{WORD:action} from %{IP:ip_from_1}(/%{NUMBER:port_from_1})? to %{NOTSPACE:label_to}:%{IP:ip_to_1}(/%{NUMBER:port_to_1})?" }
add_tag => [ "matched" ]
}
# If message doesn't match one of the fully-specified formats, try matching against an unparsed syslog format.
if "matched" not in [tags] {
grok {
match => { "message" => "%{SYSLOGPROG}: %{GREEDYDATA:message_text}" }
add_tag => [ "matched" ]
}
}
# If message still doesn't match, just grab the whole thing.
if "matched" not in [tags] {
grok {
match => { "message" => "%{GREEDYDATA:message_text}" }
}
}
# Try deriving geographic coordinates from the IP addresses, using the LogStash built-in GeoIP database.
geoip {
source => "ip_from_1"
target => "geoip_from_1"
# add_field => [ "[geoip_from_1][latitude]", "%{[geoip_from_1][latitude]}" ]
# add_field => [ "[geoip_from_1][longitude]", "%{[geoip_from_1][longitude]}" ]
}
geoip {
source => "ip_from_2"
target => "geoip_from_2"
# add_field => [ "[geoip_from_2][latitude]", "%{[geoip_from_2][latitude]}" ]
# add_field => [ "[geoip_from_2][longitude]", "%{[geoip_from_2][longitude]}" ]
}
geoip {
source => "ip_to_1"
target => "geoip_to_1"
# add_field => [ "[geoip_to_1][latitude]", "%{[geoip_to_1][latitude]}" ]
# add_field => [ "[geoip_to_1][longitude]", "%{[geoip_to_1][longitude]}" ]
}
geoip {
source => "ip_to_2"
target => "geoip_to_2"
# add_field => [ "[geoip_to_2][latitude]", "%{[geoip_to_2][latitude]}" ]
# add_field => [ "[geoip_to_2][longitude]", "%{[geoip_to_2][longitude]}" ]
}
# Convert geographic coordinates from text to numeric.
# mutate {
# convert => [ "[geoip_from_1][latitude]", "float"]
# convert => [ "[geoip_from_1][longitude]", "float"]
# convert => [ "[geoip_from_2][latitude]", "float"]
# convert => [ "[geoip_from_2][longitude]", "float"]
# convert => [ "[geoip_to_1][latitude]", "float"]
# convert => [ "[geoip_to_1][longitude]", "float"]
# convert => [ "[geoip_to_1][latitude]", "float"]
# convert => [ "[geoip_to_2][longitude]", "float"]
# }
# Replace the LogStash event timestamp with the value from the message.
date {
match => ["timestamp", "ISO8601"]
}
}
output {
# Write parsed fields to a CSV file, using a new file every minute (so that files can be loaded quickly into the data warehouse).
csv {
fields => ["@timestamp", "host", "program", "action", "direction", "protocol", "object", "label_from", "ip_from_1", "port_from_1", "user_from", "[geoip_from_1][country_code3]", "[geoip_from_1][city_name]", "[geoip_from_1][postal_code]", "[geoip_from_1][latitude]", "[geoip_from_1][longitude]", "ip_from_2", "port_from_2", "[geoip_from_2][country_code3]", "[geoip_from_2][city_name]", "[geoip_from_2][postal_code]", "[geoip_from_2][latitude]", "[geoip_from_2][longitude]", "label_to", "ip_to_1", "port_to_1", "user_to", "[geoip_to_1][country_code3]", "[geoip_to_1][city_name]", "[geoip_to_1][postal_code]", "[geoip_to_1][latitude]", "[geoip_to_1][longitude]", "ip_to_2", "port_to_2", "[geoip_to_2][country_code3]", "[geoip_to_2][city_name]", "[geoip_to_2][postal_code]", "[geoip_to_2][latitude]", "[geoip_to_2][longitude]", "duration", "bytes", "access_group", "message_text"]
path => "D:\ETLFiles\IT\In\syslog_%{+YYYYMMddHHmm}.csv"
}
}