0

我使用 3 个 Zookeeper 和每个代理配置了一个包含 3 个代理的 Kafka 集群。下图展示了我的集群的图形表示。

在此处输入图像描述

使用主机在同一网络中的生产者和消费者测试通过和命令192.168.0.10完美运行。kafka-console-producerkafka-console-consumer

基于这种情况,当我尝试kafka-console-producer.sh --broker-list DYNAMIC_DNS_ADDR:30192,DYNAMIC_DNS_ADDR:30292,DYNAMIC_DNS_ADDR:30392 --topic twitter_tweets通过 Internet 生成一些数据时,我收到以下错误:

[2018-12-10 09:59:20,772] 错误 向 twitter_tweets 主题发送消息时出错,键:null,值:16 字节,错误:(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache .kafka.common.errors.TimeoutException: twitter_tweets-1 的 1 条记录到期:自批量创建以来已过去 1505 毫秒加上逗留时间 [2018-12-10 09:59:22,273] WARN [Producer clientId=console-producer ] 无法建立与节点 1 的连接。经纪人可能不可用。(org.apache.kafka.clients.NetworkClient)

代理侦听器配置有以下属性:

listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9443
advertised.listeners=PLAINTEXT://192.168.0.241:9092,SSL://192.168.0.241:9443

显然,该advertised.listeners属性的每个代理中的 IP 地址都发生了变化。我正在使用CentOS 6.10andKafka 2.0.1进行该设置。远程登录测试有效。另一个指向 Kafka REST 代理端口的转发是通过 Internet 工作并列出所有主题。

4

1 回答 1

2

https://rmoff.net/2018/08/02/kafka-listeners-explained/

您需要两个监听器——一个响应和通告内部地址,一个用于外部地址。

关键是您的客户端连接到的侦听器将返回该侦听器的主机地址和端口

目前,您正在将外部流量欺骗到内部流量,因此您的外部流量正在打击内部侦听器。

您需要这样的东西(根据aws_internal_listener需要更改每个代理的 IP/主机名):

KAFKA_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://192.168.0.241:29092

KAFKA_ADVERTISED_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://DYNAMIC_DNS_ADDR:29092

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: aws_internal_listener:PLAINTEXT,external_listener:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: aws_internal_listener

然后,您的端口转发器DYNAMIC_DNS_ADDR应将连接重定向到 AWS 节点上的 29092。关键是外部连接不应在与内部侦听器匹配的主机上的侦听器端口结束(广告内部192.168.0地址)

用于kafkacat -L -b DYNAMIC_DNS_ADDR:29092调试和验证您的配置,如本文中所述

于 2018-12-10T15:19:30.113 回答