2

我刚开始使用 Kafka,对 Python 还很陌生。我正在使用这个名为的库kafka-python与我的 Kafka 代理进行通信。现在我需要从我的代码中动态创建一个主题,从我看到的文档中我可以调用create_topics()方法来做到这一点,但是我不确定如何获得这个类的实例。我无法从文档中理解这一点。

有人可以帮我弄这个吗?

4

1 回答 1

6

您首先需要创建一个KafkaAdminClient. 以下应该为您解决问题:

from kafka.admin import KafkaAdminClient, NewTopic


admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092", 
    client_id='test'
)

topic_list = [NewTopic(name="example_topic", num_partitions=1, replication_factor=1)]
admin_client.create_topics(new_topics=topic_list, validate_only=False)

或者,您可以使用confluent_kafkaclient,它是librdkafka的轻量级包装器:

from confluent_kafka.admin import AdminClient, NewTopic


admin_client = AdminClient({"bootstrap_servers": "localhost:9092"})
topic_list = [NewTopic("example_topic", 1, 1)]
admin_client.create_topics(topic_list)
于 2019-04-04T10:34:48.443 回答