我正在使用 confluent-kafka-python 包与 Kafka 服务器进行交互。我可以成功创建主题并将事件推送给它。但是,我的问题在于当我启动多个节点(在 Docker 中运行)时,如果第二个实例也尝试创建主题,我会收到错误消息。在创建新主题之前,我需要先检查主题是否已经存在。
from confluent_kafka.admin import AdminClient, NewTopic
kafka_admin = AdminClient({"bootstrap.servers": server})
# First check here if the topic already exists!
if not topic_exists(topic): # <-- how to accomplish this?
new_kafka_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
results = kafka_admin.create_topics([new_kafka_topic])
谢谢你的帮助!