4

我正在使用 confluent-kafka-python 包与 Kafka 服务器进行交互。我可以成功创建主题并将事件推送给它。但是,我的问题在于当我启动多个节点(在 Docker 中运行)时,如果第二个实例也尝试创建主题,我会收到错误消息。在创建新主题之前,我需要先检查主题是否已经存在。

from confluent_kafka.admin import AdminClient, NewTopic
kafka_admin = AdminClient({"bootstrap.servers": server})

# First check here if the topic already exists!
if not topic_exists(topic):  # <-- how to accomplish this?
    new_kafka_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
    results = kafka_admin.create_topics([new_kafka_topic])

谢谢你的帮助!

4

2 回答 2

3

我遇到了同样的问题,我通过以下方式进行了管理:

client = AdminClient({"bootstrap.servers": BROKER_URL})
topic_metadata = client.list_topics()
if topic_metadata.topics.get(self.topic_name) is None:
  self.create_topic()
于 2020-05-25T22:13:25.300 回答
0

该类AdminClientlist_topics方法可以传递您要检查的主题名称,因此您无需阅读现有主题的完整(可能很大)列表:

list_topics([topic=None][, timeout=-1])

文档:https ://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#id0

于 2022-02-24T15:50:20.987 回答