0

使用 Kafka Java API,我可以使用重新平衡侦听器来寻找主题的开头,如下所示(代码是 Scala,Kafka API 是 Java):

class SeekToBeginningRebalanceListener[K, V](val consumer: KafkaConsumer[K, V]) extends ConsumerRebalanceListener {
  override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
    for (tp <- partitions.asScala) {
      consumer.seekToBeginning(util.Arrays.asList(tp))
    }
  }

  override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { }
}

我会在订阅这样的主题时使用它:

kafkaConsumer.subscribe(java.util.Arrays.asList(topicName), new SeekToBeginningRebalanceListener(kafkaConsumer))

如何使用 confluent-kafka python api 做到这一点?

我可以编写一个类似的分区分配回调函数来调用:

def on_assign_callback(consumer, topic_partitions):
    for topic_partition in topic_partitions:
        print("assigned to partition. topic={}. partition={}. offset={}. error={}".format(
            topic_partition.topic, topic_partition.partition, topic_partition.offset, topic_partition.error))

        # This runs but has no effect.
        topic_partition.offset = 0

但我找不到任何 API 来进行搜索。我怎么做?

4

1 回答 1

1

来自https://github.com/confluentinc/confluent-kafka-python/issues/11

这对我有用:

def on_assign (c, ps):
    for p in ps:
        p.offset=-2
    c.assign(ps)

c.subscribe(['test'], on_assign=on_assign)

seekToBeginning当前未实现 Confluent Kafka Python API 的功能。

于 2020-08-30T14:14:48.110 回答