0

我想知道将 Kafka 主题键值与消息一起作为单独的字段包含在 Elasticsearch 中使用 Logstash 进行索引。Kafka“主题”天气包含以下消息作为键值对:

波士顿:99
波士顿:89
纽约:75
纽约:85

我正在使用以下 Logstash 配置进行索引:

input {
    kafka { 
    bootstrap_servers => "localhost:9092"
    topics => ["weather"]
    }
}
output {
   elasticsearch {
      hosts => ["localhost:9200"]
      index => "weather"
      workers => 1
    }
}

但是,只有值被索引,而不是键。索引如下所示:

 {
    "_index" : "weather",
    "_type" : "doc",
    "_id" : "48cGgXMBtkZlibwHwyWW",
    "_score" : 1.0,
    "_source" : {
      "message" : "99",
      "@timestamp" : "2020-07-24T13:32:50.820Z",
      "@version" : "1"
    }
  {
    "_index" : "weather",
    "_type" : "doc",
    "_id" : "48cGgXMBtkZlibwHwyWW",
    "_score" : 1.0,
    "_source" : {
      "message" : "89",
      "@timestamp" : "2020-07-24T13:32:50.820Z",
      "@version" : "1"
    }

我还需要在索引中包含城市名称(主题的键)。一旦我拥有两者,最终目标是在 Kibana 中针对每个城市的天气进行不同类型的可视化。

提前致谢。

4

2 回答 2

0

我将它包含在 conf 文件中,现在我可以看到关键信息:

filter {
        mutate {
          copy => { "[@metadata][kafka]" => "kafka" }
        }
}
于 2020-07-24T22:53:47.340 回答
0
input {
    kafka { 
    bootstrap_servers => "localhost:9092"
    topics => ["weather"]
    decorate_events => true         (!!!)
    }
}
filter {
    mutate {
        add_field => {
            "key" => %{[@metadata][kafka][key]}" (!!!)
        }
    }
}
output {
elasticsearch {
    hosts => ["localhost:9200"]
    index => "weather"
    workers => 1
    }
}

参考:https ://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#_metadata_fields

  1. decorate_events 选项设置为“真”
  2. 添加“add_field”过滤器
于 2020-07-25T02:38:25.543 回答