我希望能够使用pykafka以编程方式在 Kafka 中创建主题。我知道访问 TopicDict 会自动创建一个主题,如果一个主题不存在,但我不知道如何控制分区/副本的数量。此外,它还有一个令人讨厌的错误,如果 Kafka 出现故障,它最终会陷入无限循环。基本上我想做如下的事情:
create_topic('mytopic', partitions=2, replicas=3)
我希望能够使用pykafka以编程方式在 Kafka 中创建主题。我知道访问 TopicDict 会自动创建一个主题,如果一个主题不存在,但我不知道如何控制分区/副本的数量。此外,它还有一个令人讨厌的错误,如果 Kafka 出现故障,它最终会陷入无限循环。基本上我想做如下的事情:
create_topic('mytopic', partitions=2, replicas=3)
Pykafka 是 Kafka 生产者和消费者 API 的 Python 实现,您想要实现的是在 Kafka 中使用另一个 API 执行的操作,即管理/操作 API(实际上是一组 Java 类)。我不认为 Pykafka 有一个 API/包装器。您可能会观察到一个由 Kafka 自动创建的主题。您可以做的是使用属性配置自动创建主题的默认分区数和副本数。
你可以这样做subprocess
如果你安装Kafka 二进制文件,你可以做这样的事情
from pykafka import KafkaClient
import subprocess
client = KafkaClient(hosts="localhost:9092")
subprocess.Popen("PATH/TO/KAFKA/BINARY/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic testtopic --replication-factor 1 --partitions 10".split())