带有 netflow 模块的 Logstash 6.2.4
弹性搜索版本:6.2.4
Ubuntu 16.04 LTS
我有一个问题,logstash 正在侦听正确的端口,但似乎没有收集 netflow 数据并将其传递给 elasticsearch。
我们网络中的路由器正在将其 netflow 数据发送到服务器 A,而 nfcap 正在侦听端口 9995,因此尝试使用服务器 A 上的 netflow 模块运行 logstash 会导致地址正在使用错误。所以,我使用 iptables 复制数据包并将它们转发到不同的服务器,服务器 B,就像这样。
iptables -t mangle -A PREROUTING -p udp --dport 9995 -j TEE --gateway <Server B ip address>
使用 tcpdump 检查,我可以看到服务器 B 接收到的重复数据包,以及服务器 A 的 IP 地址。输出如下,但出于安全原因,我已经编辑了 IP 地址。
tcpdump -i eno1 -n dst port 9995
12:49:49.130772 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.131067 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133504 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133527 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133533 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1260
12:49:49.391871 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1452
12:49:49.391894 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1368
所以,我知道服务器 B 正在端口 9995 上接收数据包。使用 netstat 检查也显示了这一点。
netstat -an | grep 9995
udp 0 0 0.0.0.0:9995 0.0.0.0:*
logstash.yml 如下
node.name: server-b
path.data: /var/lib/logstash
http.host: "0.0.0.0"
modules:
- name: netflow
var.input.udp.port: 9995 # Inbound connections
var.elasticsearch.hosts: "<ip address>:9200"
var.kibana.host: "<ip address>:5601"
path.logs: /var/log/logstash
检查 /var/log/logstash/logstash-plain.log,我看到的唯一警告是关于 elasticsearch 的版本大于版本 6,因此不会使用 type 来确定文档类型。
[2018-07-06T12:58:13,771][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-07-06T12:58:13,817][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-06T12:58:17,599][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"module-netflow", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-07-06T12:58:17,733][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://<ip address>:9200/]}}
[2018-07-06T12:58:17,734][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://<ip address>:9200/, :path=>"/"}
[2018-07-06T12:58:17,784][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://<ip address>:9200/"}
[2018-07-06T12:58:17,810][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-07-06T12:58:17,810][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2018-07-06T12:58:17,811][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//<ip address>:9200"]}
[2018-07-06T12:58:18,088][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,101][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,102][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,120][INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:9995"}
[2018-07-06T12:58:18,126][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"module-netflow", :thread=>"#<Thread:0x16700849@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:247 sleep>"}
[2018-07-06T12:58:18,131][INFO ][logstash.inputs.udp ] UDP listener started {:address=>"0.0.0.0:9995", :receive_buffer_bytes=>"212992", :queue_size=>"2000"}
[2018-07-06T12:58:18,135][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["module-netflow"]}
ElasticSearch 正在运行并且正在从 packetbeat 和 filebeat 接收数据,/var/log/elasticsearch/elasticsearch.log 中没有任何内容可以提示 elasticsearch 的任何错误。但是,elasticsearch 没有用于 netflow 的索引模式。另一方面,Kibana 确实如此。
因此,服务器 B 上的 logstash 正在侦听 0.0.0.0:9995,端口 9995 已打开并接收来自服务器 A 的数据包,但 logstash 无法识别这些数据包。我的假设是服务器 B 忽略它们,因为目标 IP 地址是服务器 A 的 IP 地址。这听起来对吗?如果是这样,这附近有吗?
有没有更好的方法将重复的数据包从服务器 A 转发到服务器 B 并让 logstash 读取它们?
不幸的是,无法将另一个 netflow 导出器目标添加到路由器配置中。