9

我正在使用:https ://github.com/mumrah/kafka-python作为 Python 中的 kafka api。我想获取指定主题的分区数。我怎么做?

4

4 回答 4

14

可能是一个稍微简单的解决方案,但是:

from kafka import KafkaClient

client = KafkaClient('SERVER:PORT')
topic_partition_ids = client.get_partition_ids_for_topic(b'TOPIC')
len(topic_partition_ids)

在 Python 3.4.3 / kafka-python 0.9.3 上测试

于 2015-05-18T14:07:52.657 回答
2

我在尝试解决这个完全相同的问题时发现了这个问题。我知道这个问题很老,但这是我想出的解决方案(使用Kazoo与 zookeeper 交谈):

from kazoo.client import KazooClient

class KafkaInfo(object):
    def __init__(self, hosts):
        self.zk = KazooClient(hosts)
        self.zk.start()

    def topics(self):
        return self.zk.get_children('/brokers/topics')

    def partitions(self, topic):
        strs = self.zk.get_children('/brokers/topics/%s/partitions' % topic)
        return map(int, strs)

    def consumers(self):
        return self.zk.get_children('/consumers')

    def topics_for_consumer(self, consumer):
        return self.zk.get_children('/consumers/%s/offsets' % consumer)

    def offset(self, topic, consumer, partition):
        (n, _) = self.zk.get('/consumers/%s/offsets/%s/%d' % (consumer, topic, partition))
        return int(n)
于 2015-05-07T06:34:10.737 回答
1

Python 3.8.10/kafka-python 2.0.2 解决方案:

from kafka import KafkaConsumer

def get_partitions_number(server, topic):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=server
    )
    partitions = consumer.partitions_for_topic(topic)
    return len(partitions)

partitions_for_topic

于 2021-08-04T07:03:53.700 回答
0

对于那些使用 Confluent-Python 或企业 API 的人。这可以通过以下方式完成:

def count_partitions(my_partitions) -> int:
    count = 0
    for part in my_partitions:
        count = count + 1
    return count

cluster_data: ClusterMetadata = producer.list_topics(topic=TOPIC)
    topic_data: TopicMetadata = cluster_data.topics[TOPIC]
    available_partitions: PartitionMetadata = topic_data.partitions
    print(count_partitions(available_partitions))

于 2019-06-28T16:32:50.383 回答