也许这是一个幼稚的问题,但它让我停留了一段时间。请多多包涵。
我有一个类 DataConsumer.java 实现ConsumerAwareRebalanceListener:
@Component
public class DataConsumer implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// seek offsets based on a given timestamp
}
@KafkaListener(topics = "dataTopic", containerFactory = "kafkaListenerContainerFactory")
receive(ConsumerRecord payload) {}
}
所以为了onPartitionsAssigned工作,我需要调用另一个类中定义setConsumerRebalanceListener的方法,如下所示:kafkaListenerContainerFactory
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setConsumerRebalanceListener(____________);
// rest part omitted
}
我的问题是关于____________上面的这部分。我应该放什么?
在我的理解中,kafkaListenerContainerFactory当我们在 DataConsumer 类中初始化@KafkaListener容器时会调用方法,这意味着已经有一个现有的 DataConsumer 实例来保存@kafkaLister. 我怎样才能将已经存在的 DataConsumer 实例传递给setConsumerRebalanceListener函数?
我可以搜索到的所有示例代码片段如下:
setConsumerRebalanceListener(new ConsumerRebalanceListener() {
//override the functions
})
但这不是创建一个新实例吗?如果我把new DataConsumer()它放在现有实例中会失去一些状态(例如寻求偏移量的时间戳),所以这不起作用。