使用 logstash 5.0.0,以 kafka 源作为输入 -> 获取数据并在 Elasticsearch 中生成输出。(弹性搜索 5.0.0 版)
Logstash 配置文件:
input{
kafka{
bootstrap_servers => "XXX.XXX.XX.XXX:9092","XXX.XXX.XX.XXX:9092","XXX.XXX.XX.XXX:9092"
topics => ["a-data","f-data","n-data"]
group_id => "sound"
auto_offset_reset => "earliest"
consumer_threads => 2
}
}
filter{
json{
source => "message"
}
}
output {
elasticsearch {
hosts => [ "XXX.XXX.XX.XXX:9200" ]
}
}
当我运行以下配置时,我收到以下错误。
$ ./logstash -f sound.conf
Sending Logstash logs to /logstash-5.0.0/logs which is now configured vi a log4j2.properties.
[2017-01-17T10:53:29,273][ERROR][logstash.agent ] fetched an invalid c onfig {:config=>"input{\nkafka{\nbootstrap_servers => \"XX.XXX.XXX.XX:9092\",\"XXX.XXX.XX.XXX:9092\",\"XXX.XXX.XX.XXX:9092\"\ntopics => [\"a-data\",\"f-data\ ",\"n-data\"]\ngroup_id => \"sound\"\nauto_offset_reset => \"earliest\"\nc onsumer_threads => 2\n}\n}\nfilter{\njson{\nsource => \"message\"\n}\n}\noutput {\nelasticsearch {\nhosts => [ \"XX.XX.XXX.XX:9200\" ]\n}\n}\n\n", :reason=>"Ex pected one of #, {, } at line 3, column 40 (byte 54) after input{\nkafka{\nboots trap_servers => \"XX.XX.XXX.XX:9092\""}
任何人都可以帮助我进行此配置。