1

我正在接收一条 JSON 消息(Cloudtrail,许多对象连接在一起),当我完成过滤它时,Logstash 似乎没有正确解析消息。就好像散列被简单地转储到一个字符串中。

无论如何,这是输入和过滤器。

input {
  s3 {
    bucket => "stanson-ops"
    delete => false
    #snipped unimportant bits
    type => "cloudtrail"
  }
}

filter {
  if [type] == "cloudtrail" {
    json { # http://logstash.net/docs/1.4.2/filters/json
      source => "message"
    }
    ruby {
      code => "event['RecordStr'] = event['Records'].join('~~~')"
    }
    split {
      field => "RecordStr"
      terminator => "~~~"
      remove_field => [ "message", "Records" ]
    }
  }
}

当我完成时,elasticsearch 条目包含一个RecordStr带有以下数据的键。它没有message字段,也没有Records字段。

{"eventVersion"=>"1.01", "userIdentity"=>{"type"=>"IAMUser", "principalId"=>"xxx"}}

请注意,这不是JSON 样式,它已被解析。(这对于 concat->split 的工作很重要)。

因此,RecordStr作为一个值,键看起来不太正确。此外,在 Kibana 中,可过滤字段包括RecordStr(无子字段)。它包括一些不再存在的条目:Records.eventVersion, Records.userIdentity.type.

这是为什么?如何获得正确的字段?

编辑 1这是输入的一部分。

{"Records":[{"eventVersion":"1.01","userIdentity":{"type":"IAMUser",

这是未经修饰的 JSON。看起来文件的主体(上面)在message字段中,json提取它,我最终在Records字段中得到一个记录数组。这就是我加入并拆分它的原因——然后我得到了单独的文档,每个文档都有一个RecordStr条目。但是,模板(?)似乎不理解新结构。

4

1 回答 1

0

我已经制定了一种方法,可以根据您的要求索引相应的 CloudTrail 字段。以下是修改后的输入和过滤器配置:

input {
  s3 {
    backup_add_prefix => \"processed-logs/\"
    backup_to_bucket => \"test-bucket\"
    bucket => \"test-bucket\"
    delete => true
    interval => 30
    prefix => \"AWSLogs/<account-id>/CloudTrail/\"
    type => \"cloudtrail\"
  }
}

filter {
  if [type] == \"cloudtrail\" {
    json {
      source => \"message\"
    }
    ruby {
      code => \"event.set('RecordStr', event.get('Records').join('~~~'))\"
    }
    split {
      field => \"RecordStr\"
      terminator => \"~~~\"
      remove_field => [ \"message\", \"Records\" ]
    }
    mutate {
      gsub => [
        \"RecordStr\", \"=>\", \":\"
      ]
    }
    mutate {
      gsub => [
        \"RecordStr\", \"nil\", \"null\"
      ]
    }
    json {
      skip_on_invalid_json => true
      source => \"RecordStr\"
      target => \"cloudtrail\"
    }
    mutate {
      add_tag => [\"cloudtrail\"]
      remove_field=>[\"RecordStr\", \"@version\"]
    }
    date {
      match => [\"[cloudtrail][eventTime]\",\"ISO8601\"]
    }
  }
}

这里的关键观察是,一旦拆分完成,我们在事件中不再拥有有效的 json,因此需要执行变异替换('=>' 到 ':' 和 'nil' 到 'null')。此外,我发现从 CloudTrail eventTime 中获取时间戳并清理一些不必要的字段很有用。

于 2018-10-09T00:39:58.957 回答