0

带有 netflow 模块的 Logstash 6.2.4

弹性搜索版本:6.2.4

Ubuntu 16.04 LTS

我有一个问题,logstash 正在侦听正确的端口,但似乎没有收集 netflow 数据并将其传递给 elasticsearch。

我们网络中的路由器正在将其 netflow 数据发送到服务器 A,而 nfcap 正在侦听端口 9995,因此尝试使用服务器 A 上的 netflow 模块运行 logstash 会导致地址正在使用错误。所以,我使用 iptables 复制数据包并将它们转发到不同的服务器,服务器 B,就像这样。

iptables -t mangle -A PREROUTING -p udp --dport 9995 -j TEE --gateway <Server B ip address>

使用 tcpdump 检查,我可以看到服务器 B 接收到的重复数据包,以及服务器 A 的 IP 地址。输出如下,但出于安全原因,我已经编辑了 IP 地址。

tcpdump -i eno1 -n dst port 9995
12:49:49.130772 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.131067 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133504 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133527 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133533 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1260
12:49:49.391871 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1452
12:49:49.391894 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1368

所以,我知道服务器 B 正在端口 9995 上接收数据包。使用 netstat 检查也显示了这一点。

netstat -an | grep 9995
udp        0      0 0.0.0.0:9995            0.0.0.0:*

logstash.yml 如下

node.name: server-b
path.data: /var/lib/logstash
http.host: "0.0.0.0"

modules:
  - name: netflow
    var.input.udp.port: 9995 # Inbound connections
    var.elasticsearch.hosts: "<ip address>:9200"
    var.kibana.host: "<ip address>:5601"

path.logs: /var/log/logstash   

检查 /var/log/logstash/logstash-plain.log,我看到的唯一警告是关于 elasticsearch 的版本大于版本 6,因此不会使用 type 来确定文档类型。

[2018-07-06T12:58:13,771][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-07-06T12:58:13,817][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-06T12:58:17,599][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"module-netflow", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-07-06T12:58:17,733][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://<ip address>:9200/]}}
[2018-07-06T12:58:17,734][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://<ip address>:9200/, :path=>"/"}
[2018-07-06T12:58:17,784][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://<ip address>:9200/"}
[2018-07-06T12:58:17,810][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-07-06T12:58:17,810][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2018-07-06T12:58:17,811][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//<ip address>:9200"]}
[2018-07-06T12:58:18,088][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,101][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,102][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,120][INFO ][logstash.inputs.udp      ] Starting UDP listener {:address=>"0.0.0.0:9995"}
[2018-07-06T12:58:18,126][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"module-netflow", :thread=>"#<Thread:0x16700849@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:247 sleep>"}
[2018-07-06T12:58:18,131][INFO ][logstash.inputs.udp      ] UDP listener started {:address=>"0.0.0.0:9995", :receive_buffer_bytes=>"212992", :queue_size=>"2000"}
[2018-07-06T12:58:18,135][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["module-netflow"]}

ElasticSearch 正在运行并且正在从 packetbeat 和 filebeat 接收数据,/var/log/elasticsearch/elasticsearch.log 中没有任何内容可以提示 elasticsearch 的任何错误。但是,elasticsearch 没有用于 netflow 的索引模式。另一方面,Kibana 确实如此。

因此,服务器 B 上的 logstash 正在侦听 0.0.0.0:9995,端口 9995 已打开并接收来自服务器 A 的数据包,但 logstash 无法识别这些数据包。我的假设是服务器 B 忽略它们,因为目标 IP 地址是服务器 A 的 IP 地址。这听起来对吗?如果是这样,这附近有吗?

有没有更好的方法将重复的数据包从服务器 A 转发到服务器 B 并让 logstash 读取它们?

不幸的是,无法将另一个 netflow 导出器目标添加到路由器配置中。

4

1 回答 1

0

我会回答我自己的问题。

服务器 B 确实忽略了 netflow 数据,因为它无法识别 IP 地址。我将服务器 A 的 IP 地址添加为环回接口,它按预期工作。

这可能不是最好的解决方案,也是在生产环境中避免使用的解决方案,但出于测试目的,它应该没问题。

于 2018-07-09T01:18:02.417 回答