使用 Kafka Java API,我可以使用重新平衡侦听器来寻找主题的开头,如下所示(代码是 Scala,Kafka API 是 Java):
class SeekToBeginningRebalanceListener[K, V](val consumer: KafkaConsumer[K, V]) extends ConsumerRebalanceListener {
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
for (tp <- partitions.asScala) {
consumer.seekToBeginning(util.Arrays.asList(tp))
}
}
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { }
}
我会在订阅这样的主题时使用它:
kafkaConsumer.subscribe(java.util.Arrays.asList(topicName), new SeekToBeginningRebalanceListener(kafkaConsumer))
如何使用 confluent-kafka python api 做到这一点?
我可以编写一个类似的分区分配回调函数来调用:
def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
print("assigned to partition. topic={}. partition={}. offset={}. error={}".format(
topic_partition.topic, topic_partition.partition, topic_partition.offset, topic_partition.error))
# This runs but has no effect.
topic_partition.offset = 0
但我找不到任何 API 来进行搜索。我怎么做?