1

我想将 Metribeat 数据发送到 Kafka。从 kafka 到 Logstash。

这是我的metricbeat.yml

metricbeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 1
  index.codec: best_compression

setup.dashboards.enabled: false

output.kafka:

    hosts: ["kafka:9092"]   // I only have one host. 
    topic: "%{[fields.log_topic]}"   
    compression: gzip

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

而从Kakfa到Logstash,就是这个配置文件。

input {
    kafka {
            bootstrap_servers => "localhost:9092"
            topics => ["test"]
    }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    manage_template => false
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"

从 Logstash,我想将数据发送到 Elasticsearch 以由 Kibana 可视化。

但是,我在 Elasticsearch 中没有看到索引。

.\metricbeat.exe setup -e在运行 (Windows) 之前正在运行.\start-service metricbeat.

Elasticsearch 服务器、Kibana 服务器、Zookeeper 服务器、Kafka 服务器运行良好。

我的logstash看起来还不错。下面是我从我的 cmd 中看到的内容。

[2019-05-23T17:26:51,668][INFO][org.apache.kafka.common.utils.AppInfoParser] Kafka 版本:2.1.0 [2019-05-23T17:26:51,738][INFO][org. apache.kafka.common.utils.AppInfoParser] Kafka commitId:eec43959745f444f [2019-05-23T17:26:52,208][INFO][org.apache.kafka.clients.Metadata] 集群 ID:eJYo7GgaTZitGoeiROlk2w [2019-05-23T17: 26:52,211][INFO][logstash.agent] 成功启动 Logstash API 端点 {:port=>9600} [2019-05-23T17:26:52,222][INFO][org.apache.kafka.clients.consumer.internals .AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 发现组协调器 DESKTOP-MOVCIN1:9092 (id: 2147483647 rack: null) [2019-05-23T17:26:52,229][INFO][org.apache. kafka.clients.consumer.internals.ConsumerCoordinator] [消费者clientId=logstash-0,groupId=logstash] 撤销之前分配的分区 [] [2019-05-23T17:26:52,231][INFO][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash ](重新)加入组 [2019-05-23T17:26:52,274][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 成功加入第 23 代组 [2019-05-23T17:26:52,281][INFO][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 设置新分配的分区 [测试-0]internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 成功加入第 23 代组 [2019-05-23T17:26:52,281][INFO][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator ] [Consumer clientId=logstash-0, groupId=logstash] 设置新分配的分区 [test-0]internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] 成功加入第 23 代组 [2019-05-23T17:26:52,281][INFO][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator ] [Consumer clientId=logstash-0, groupId=logstash] 设置新分配的分区 [test-0]

谁能给我一些指导?

4

1 回答 1

1

我终于设法使用Metricbeat收集系统数据,通过Kafka将它们发送到Logstash并将它们存储在Elasticsearch中并在Kibana中查看它们。

这还不是一个理想的答案。我会更新它,因为我将来会更好地理解。

对于metricbeat.yml配置,

output.kafka:
    hosts: ["localhost:9092"] 
    topic: "testkafka"    // I created this topic in Kafka earlier. 

对于Logstash配置,

input { 
    kafka { 
              bootstrap_servers => "localhost:9092"
              topics => ["testkafka"]
    }
}

output {
   elasticsearch { 
       hosts => ["localhost:9200"]
       index => "testkafka" 
   }
}

至少,通过这些配置,我能够将 ELK 与 K(Kafka) 集成。

于 2019-06-10T08:59:42.823 回答