我对整个 ELK 设置部分比较陌生,因此请耐心等待。
我想要做的是将存储在 S3 上的 cloudtrail 日志发送到本地托管的(我的意思是非 AWS)ELK 设置中。我没有在设置中的任何地方使用 Filebeat。我相信它不是强制使用它。Logstash 可以直接向 ES 下发数据。
- 我在这里吗?
一旦数据在 ES 中,我只想在 Kibana 中可视化它。
到目前为止,我已经尝试过什么,因为我的 ELK 已启动并正在运行,并且设置中没有涉及 Filebeat:
使用 S3 logstash 插件
/etc/logstash/conf.d/aws_ct_s3.conf的内容
input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}
output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}
当使用上述配置启动 logstash 时,我可以看到一切正常。使用 head google chrome 插件,我可以看到文档不断被添加到指定的索引中。事实上,当我浏览它时,我也可以看到有我需要的数据。我也能在 Kibana 方面看到同样的情况。
这些 gzip 文件中的每一个都具有以下格式的数据:
{
"Records": [
dictionary_D1,
dictionary_D2,
.
.
.
]
}
我想让上面字典列表中的每一个字典成为 Kibana 中的一个单独事件。通过一些谷歌搜索,我知道我可以使用split
过滤器来实现我想要的。现在我aws_ct_s3.conf
看起来像:
input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}
filter {
split {
field => "Records"
}
}
output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}
有了这个,我实际上在 Kibana 上获得了我需要的数据。
现在的问题是
在没有过滤器的情况下,Logstash 从 S3 传送到 Elasticsearch 的文档数量以 GB 为单位,而在应用过滤器后,它仅停止在大约 5000 个文档。
我不知道我在这里做错了什么。有人可以帮忙吗?
当前配置:
java -XshowSettings:vm => Max Heap Size: 8.9 GB
elasticsearch jvm options => max and min heap size: 6GB
logstash jvm options => max and min heap size: 2GB
ES version - 6.6.0
LS version - 6.6.0
Kibana version - 6.6.0