0

我正在创建消费者(其中包含单个消费者的消费者组):

    Properties properties = new Properties();
    properties.put("zookeeper.connect","localhost:2181");
    properties.put("auto.offset.reset", "largest");
    properties.put("group.id", groupId);
    properties.put("auto.commit.enable", "true");
    ConsumerConfig consumerConfig = new ConsumerConfig(properties);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    consumerMap.entrySet().stream().forEach(
                streams -> {
                    streams.getValue().stream().forEach(
                                stream -> {
                                    KafkaBasicConsumer customConsumer = new KafkaBasicConsumer();                               
                                    try {
                                        Future<?> consumerFuture = kafkaConsumerExecutor.submit(customConsumer);

                                        kafkaConsumersFuture.put(groupId, consumerFuture);                                                                  
                                    } catch (Exception e) {
                                        logger.error("---- Got error : "+ e.getMessage());
                                        logger.error("Exception : ", e);
                                    }
                                }
                            );
                }
            );

我为同一主题订阅了 2 个消费者。我通过存储其未来对象然后调用 consumerFuture.cancel(Boolean.TRUE); 来取消订阅消费者

现在我用上面的代码再次订阅了同一个消费者,它成功注册了。但是,当发布者现在发布时,新订阅的消费者没有收到消息,而另一个注册的消费者正在收到消息

我也在检查消费者的偏移量,当生产者发布但消费者没有收到消息时,它们会得到更新。生产前:

组主题 Pid 偏移 logSize 滞后

A T1 0 94 94 1

组主题 Pid 偏移 logSize 滞后

B T1 0 94 94 1

生产后:

组主题 Pid 偏移 logSize 滞后

A T1 0 95 97 2

组主题 Pid 偏移 logSize 滞后

B T1 0 94 97 2

我无法弄清楚这是否是生产者方面的问题(分区不够)或者我是否以不正确的方式创建了消费者此外,我无法弄清楚日志和滞后列在此意味着什么。

让我知道是否有人可以提供帮助或需要更多详细信息。

4

1 回答 1

0

我找到了解决问题的方法,感谢@nautilus 提醒更新。

我的主要目的是提供端点来订阅和取消订阅 kafka 中的消费者。由于 kafka 只提供订阅而不是取消订阅(只能手动),我不得不在 kafka 实现上编写层。

我将消费者对象存储在一个静态映射中,其中键为组 id(因为我的消费者组只能有一个消费者)

问题是当取消订阅和具有相同组 id 的旧消费者阻止新消费者获取消息时,我没有关闭消费者。

私有静态地图 kafkaConsumersFuture

根据一些参数,找出组 id

    kafkaConsumersFuture.put(groupId, consumerConnector);

在退订的同时,我做到了

    ConsumerConnector consumerConnector = kafkaConsumersFuture.get(groupId);
    if(consumerConnector!=null) {

        consumerConnector.shutdown();
        kafkaConsumersFuture.remove(groupId);
    }
于 2016-04-27T14:06:25.157 回答