1

我对整个 ELK 设置部分比较陌生,因此请耐心等待。

我想要做的是将存储在 S3 上的 cloudtrail 日志发送到本地托管的(我的意思是非 AWS)ELK 设置中。我没有在设置中的任何地方使用 Filebeat。我相信它不是强制使用它。Logstash 可以直接向 ES 下发数据。

  1. 我在这里吗?

一旦数据在 ES 中,我只想在 Kibana 中可视化它。

到目前为止,我已经尝试过什么,因为我的 ELK 已启动并正在运行,并且设置中没有涉及 Filebeat:

使用 S3 logstash 插件

/etc/logstash/conf.d/aws_ct_s3.conf的内容

input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}

output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}

当使用上述配置启动 logstash 时,我可以看到一切正常。使用 head google chrome 插件,我可以看到文档不断被添加到指定的索引中。事实上,当我浏览它时,我也可以看到有我需要的数据。我也能在 Kibana 方面看到同样的情况。

这些 gzip 文件中的每一个都具有以下格式的数据:

{
  "Records": [
    dictionary_D1,
    dictionary_D2,
    .
    .
    .
  ]
}

我想让上面字典列表中的每一个字典成为 Kibana 中的一个单独事件。通过一些谷歌搜索,我知道我可以使用split过滤器来实现我想要的。现在我aws_ct_s3.conf看起来像:

input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}

filter {
split {
   field => "Records"
 }
}

output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}

有了这个,我实际上在 Kibana 上获得了我需要的数据。

现在的问题是

在没有过滤器的情况下,Logstash 从 S3 传送到 Elasticsearch 的文档数量以 GB 为单位,而在应用过滤器后,它仅停止在大约 5000 个文档。

我不知道我在这里做错了什么。有人可以帮忙吗?

当前配置:

java -XshowSettings:vm => Max Heap Size: 8.9 GB

elasticsearch jvm options => max and min heap size: 6GB

logstash jvm options => max and min heap size: 2GB

ES version - 6.6.0

LS version - 6.6.0

Kibana version - 6.6.0

这是当前堆使用情况的样子: 在此处输入图像描述

4

0 回答 0