我想将 Metribeat 数据发送到 Kafka。从 kafka 到 Logstash。
这是我的metricbeat.yml
。
metricbeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
index.codec: best_compression
setup.dashboards.enabled: false
output.kafka:
hosts: ["kafka:9092"] // I only have one host.
topic: "%{[fields.log_topic]}"
compression: gzip
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
而从Kakfa到Logstash,就是这个配置文件。
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test"]
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
从 Logstash,我想将数据发送到 Elasticsearch 以由 Kibana 可视化。
但是,我在 Elasticsearch 中没有看到索引。
我.\metricbeat.exe setup -e
在运行 (Windows) 之前正在运行.\start-service metricbeat
.
Elasticsearch 服务器、Kibana 服务器、Zookeeper 服务器、Kafka 服务器运行良好。
我的logstash看起来还不错。下面是我从我的 cmd 中看到的内容。
[2019-05-23T17:26:51,668][INFO][org.apache.kafka.common.utils.AppInfoParser] Kafka 版本:2.1.0 [2019-05-23T17:26:51,738][INFO][org. apache.kafka.common.utils.AppInfoParser] Kafka commitId:eec43959745f444f [2019-05-23T17:26:52,208][INFO][org.apache.kafka.clients.Metadata] 集群 ID:eJYo7GgaTZitGoeiROlk2w [2019-05-23T17: 26:52,211][INFO][logstash.agent] 成功启动 Logstash API 端点 {:port=>9600} [2019-05-23T17:26:52,222][INFO][org.apache.kafka.clients.consumer.internals .AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 发现组协调器 DESKTOP-MOVCIN1:9092 (id: 2147483647 rack: null) [2019-05-23T17:26:52,229][INFO][org.apache. kafka.clients.consumer.internals.ConsumerCoordinator] [消费者clientId=logstash-0,groupId=logstash] 撤销之前分配的分区 [] [2019-05-23T17:26:52,231][INFO][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash ](重新)加入组 [2019-05-23T17:26:52,274][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 成功加入第 23 代组 [2019-05-23T17:26:52,281][INFO][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 设置新分配的分区 [测试-0]internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 成功加入第 23 代组 [2019-05-23T17:26:52,281][INFO][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator ] [Consumer clientId=logstash-0, groupId=logstash] 设置新分配的分区 [test-0]internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 成功加入第 23 代组 [2019-05-23T17:26:52,281][INFO][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator ] [Consumer clientId=logstash-0, groupId=logstash] 设置新分配的分区 [test-0]
谁能给我一些指导?