我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索引导服务器并与它们建立网络连接,但没有消息通过。相反,在每条 Type 消息之后,A
我立即收到一个 type B
... 并最终收到一个 type C
:
A [INFO] 2019-11-19T15:17:19.603Z <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR] 2019-11-19T15:17:19.605Z <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
是什么导致代理节点接受来自有希望的生产者的 TCP 连接,但随后又立即关闭它?
编辑
该主题已存在,并
kafka-topics.sh --list
显示它。我用过的所有客户端都有同样的问题:Kafka's
kafka-console-producer.sh
、kafka-python、confluent-kafka和kafkacatKafka 集群与我的所有其他机器位于同一个 VPC 中,其安全组允许该 VPC 内的任何传入和传出流量。
但是,它由 Amazon 的 Kafka 托管流 (MSK) 服务管理,这意味着我无法精细控制服务器安装设置(甚至不知道它们是什么)。MSK 只是发布 zookeeper 和消息代理 URL 供客户端使用。
生产者作为 AWS Lambda 函数运行,但是当我在普通 EC2 实例上运行它时问题仍然存在。
权限不是问题。我已经为 lambda 角色分配了它需要的所有 AWS 权限(AWS 总是非常明确地说明哪个操作需要哪个缺少的权限)。
连接性不是问题。我可以使用标准 telnet 访问动物园管理员和消息代理的 URL。但是,向 zookeepers 发出命令是有效的,而向消息代理发出命令总是最终失败。由于Kafka 在 TCP 上使用二进制协议,我不知道如何进一步调试问题。
编辑
正如建议的那样,我用
./kafkacat -b $BROKERS -L -d 经纪人
并得到:
7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
那么,这是客户端和代理 API 版本之间的一种不匹配吗?记住我无法控制 AWS 提供的 Kafka 集群的版本或配置,我该如何从中恢复?