4

我在 MSK 上创建了一个 Kafka 集群,现在我正在尝试使用 python 连接到集群。

我写了这个短代码:

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['host1:9092', 'host2:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8'),
    api_version=(2, 4, 1)
)

producer.send('test', value={'hello':'world'})

问题是每次我运行它时都会出现这个错误:

KafkaTimeoutError: Failed to update metadata after 60.0 secs.

我认为这可能与 Kafka 创建主题有关,所以我将这一行添加到配置中。

auto.create.topics.enable=true

但我仍然遇到同样的错误。

这是我的完整配置文件:

default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
socket.request.max.bytes=104857600
unclean.leader.election.enable=true
auto.create.topics.enable=true
zookeeper.connection.timeout.ms=5000

我在这里想念什么?我在某处读到可能与 SSL 身份验证有关的内容,但在任何步骤中,都没有任何 .pem 文件、.ca 文件或类似的文件。

4

1 回答 1

2

您可能无法连接到 MSK。您遇到的错误是超时错误。

第一次开始使用 MSK 时的一个错误假设是,您可以从 AWS 网络外部连接到它。这是一个错误的假设。AWS 有一份关于如何访问 MSK 集群的详细文档。

如果您没有从MSK VPC 内的 EC2 实例运行客户端,您将无法访问它。即使你在 MSK 安全组策略中打开了相关的安全规则。

我花时间尝试多个代理从 AWS 外部访问 MSK,但没有成功。只需按照我上面提到的指南进行操作,它可能会解决您的连接问题。

此外,如果您是 MSK 的新手,我强烈建议您阅读入门教程,或者至少阅读第 5 步(创建主题)和第 6 步(生产和使用数据)。

于 2021-03-07T10:08:23.630 回答