0

我有以下代码可以连接到 Kafka

Properties props = new Properties();
props.put("bootstrap.servers", "myconfluentkafkabroker:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_CG");
props.put("group.instance.id", "my_instance_CG_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("key.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
props.put("value.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));

KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("MyTopic"));
try {
    while (true) { 
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            log.debug("topic = %s, partition = %d, offset = %d,"
                customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(),
                record.key(), record.value());

            int updatedCount = 1;
            if (custCountryMap.countainsKey(record.value())) {
                updatedCount = custCountryMap.get(record.value()) + 1;
            }
            custCountryMap.put(record.value(), updatedCount)

            JSONObject json = new JSONObject(custCountryMap);
            System.out.println(json.toString(4));
        }
    }
} finally {
    consumer.close();
}

代码没有抛出任何错误,但我仍然没有看到列出的消费者

在此处输入图像描述

这会是一个问题吗?

props.put("group.instance.id", "my_instance_CG_id");

4

1 回答 1

2

您应该使用 Kafka 提供的内置工具验证您看到的信息,例如kafka-consumer-groups.sh

您还需要实际轮询消息并提交偏移量,而不仅仅是在看到任何内容之前订阅。

否则,对于特定的控制中心仪表板,可能需​​要您将监控拦截器添加到您的客户端

于 2021-10-25T17:19:02.930 回答