我在 MSK 上创建了一个 Kafka 集群,现在我正在尝试使用 python 连接到集群。
我写了这个短代码:
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['host1:9092', 'host2:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
api_version=(2, 4, 1)
)
producer.send('test', value={'hello':'world'})
问题是每次我运行它时都会出现这个错误:
KafkaTimeoutError: Failed to update metadata after 60.0 secs.
我认为这可能与 Kafka 创建主题有关,所以我将这一行添加到配置中。
auto.create.topics.enable=true
但我仍然遇到同样的错误。
这是我的完整配置文件:
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
socket.request.max.bytes=104857600
unclean.leader.election.enable=true
auto.create.topics.enable=true
zookeeper.connection.timeout.ms=5000
我在这里想念什么?我在某处读到可能与 SSL 身份验证有关的内容,但在任何步骤中,都没有任何 .pem 文件、.ca 文件或类似的文件。