1

也许这是一个幼稚的问题,但它让我停留了一段时间。请多多包涵。

我有一个类 DataConsumer.java 实现ConsumerAwareRebalanceListener

@Component
public class DataConsumer implements ConsumerAwareRebalanceListener {
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // seek offsets based on a given timestamp
    }
    @KafkaListener(topics = "dataTopic", containerFactory = "kafkaListenerContainerFactory")
    receive(ConsumerRecord payload) {}
}

所以为了onPartitionsAssigned工作,我需要调用另一个类中定义setConsumerRebalanceListener的方法,如下所示:kafkaListenerContainerFactory

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.getContainerProperties().setConsumerRebalanceListener(____________);
    // rest part omitted
}

我的问题是关于____________上面的这部分。我应该放什么?

在我的理解中,kafkaListenerContainerFactory当我们在 DataConsumer 类中初始化@KafkaListener容器时会调用方法,这意味着已经有一个现有的 DataConsumer 实例来保存@kafkaLister. 我怎样才能将已经存在的 DataConsumer 实例传递给setConsumerRebalanceListener函数?

我可以搜索到的所有示例代码片段如下:

setConsumerRebalanceListener(new ConsumerRebalanceListener() {
    //override the functions
})

但这不是创建一个新实例吗?如果我把new DataConsumer()它放在现有实例中会失去一些状态(例如寻求偏移量的时间戳),所以这不起作用。

4

1 回答 1

1

您可以声明DataConsumer@Bean(而不是使用@Component),然后您可以在那里注入您的 bean。

但是,在这种情况下,这是错误的机制。

改为实现ConsumerSeekAware- 容器将自动检测到您的侦听器实现了它并调用它的onPartitionsAssigned.

在文档中查找特定偏移量。

于 2020-06-01T12:56:50.697 回答