8

我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索引导服务器并与它们建立网络连接,但没有消息通过。相反,在每条 Type 消息之后,A我立即收到一个 type B... 并最终收到一个 type C

A [INFO]    2019-11-19T15:17:19.603Z    <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR]   2019-11-19T15:17:19.605Z    <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

是什么导致代理节点接受来自有希望的生产者的 TCP 连接,但随后又立即关闭它?

编辑

  • 该主题已存在,并kafka-topics.sh --list显示它。

  • 我用过的所有客户端都有同样的问题:Kafka's kafka-console-producer.shkafka-pythonconfluent-kafkakafkacat

  • Kafka 集群与我的所有其他机器位于同一个 VPC 中,其安全组允许该 VPC 内的任何传入和传出流量。

  • 但是,它由 Amazon 的 Kafka 托管流 (MSK) 服务管理,这意味着我无法精细控制服务器安装设置(甚至不知道它们是什么)。MSK 只是发布 zookeeper 和消息代理 URL 供客户端使用。

  • 生产者作为 AWS Lambda 函数运行,但是当我在普通 EC2 实例上运行它时问题仍然存在。

  • 权限不是问题。我已经为 lambda 角色分配了它需要的所有 AWS 权限(AWS 总是非常明确地说明哪个操作需要哪个缺少的权限)。

  • 连接性不是问题。我可以使用标准 telnet 访问动物园管理员和消息代理的 URL。但是,向 zookeepers 发出命令是有效的,而向消息代理发出命令总是最终失败。由于Kafka 在 TCP 上使用二进制协议,我不知道如何进一步调试问题。

编辑

正如建议的那样,我用

./kafkacat -b $BROKERS -L -d 经纪人

并得到:

7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN

那么,这是客户端和代理 API 版本之间的一种不匹配吗?记住我无法控制 AWS 提供的 Kafka 集群的版本或配置,我该如何从中恢复?

4

2 回答 2

6

我认为这与 TLS 加密有关。默认情况下,MSK 启动一个同时接受 PLAINTEXT 和 TLS 的集群,但如果您以编程方式从集群中获取引导服务器,它只会为您提供 TLS 端口。如果您遇到这种情况,请尝试改用 PLAINTEXT 端口 9092。

要对 TLS 客户端进行身份验证,您需要生成一个证书:https ://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html然后需要将此证书放到您的 lambda 上并引用Producer 配置中的证书。

如果您只能将 MSK 集群配置为 PLAINTEXT,那么当您从 AWS SDK 获取引导服务器时,它将为您提供 PLAINTEXT 端口,您应该会很好。

于 2019-11-26T18:32:47.083 回答
1

由于它也不适用于非 python 客户端,因此它不太可能是库中的错误。

这似乎是一个网络问题

有一个名为 kafka 代理设置advertised.listeners,它指定客户端在第一次连接后将使用的地址。换句话说,这就是客户端消费或生产时发生的情况:

  1. 使用bootstrap.servers,它建立第一个连接并要求使用真实地址。

  2. advertised.listeners代理使用代理配置中指定的地址进行回复。

  3. 客户端尝试使用该新地址进行消费或生产。

这是一项安全功能,可防止可公开访问的代理被不应访问的客户端消费/生产。

如何诊断

运行以下命令:

$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L

返回

Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
1 brokers:
  broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092

在这种情况下,ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092是客户端指定的地址,即使客户端可以访问该地址/端口,ip-172-31-18-160.us-west-2.compute.internal:9092也将是用于消费/生产的地址。

现在,如果您在 AWS MSK 中运行 kafka,它可能会为您管理它。您必须确保您可以访问该命令返回的地址。如果不这样做,您可能需要更改它或从有权访问它的主机运行命令。

另一种选择可能是使用可以在内部访问该地址的堡垒主机打开 ssh 隧道。

您可以在以下位置找到更多详细信息:https ://rmoff.net/2018/08/02/kafka-listeners-explained

于 2019-11-23T10:36:18.717 回答