5

我看到 PyKafka 的异常行为,这是我最近才开始使用的客户端。

错误如下:

Failed to connect newly created broker for b'4758e4ee1af6':9092
{0: <pykafka.broker.Broker at 0x7f319e19be10 (host=b'4758e4ee1af6',port=9092, id=0)>}

错误的根源在于以下几行:

    self.client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
consumer = self.client.topics[bytes(self.input_topic,"UTF-8")].get_balanced_consumer(
        consumer_group=bytes(self.consumer_group,"UTF-8"),
        auto_commit_enable=True
    )

调试我看到客户端使用正确的字符串 IP 连接到种子代理但是当检索到代理列表时,他们的 IP 是二进制的,当 PyKafka 尝试再次连接以创建消费者时,这些 IP 显然不起作用.

另一个可能相关的问题是我需要自己将主题名称和消费者组名称转换为字节(与其他客户端一样),但文档中的所有示例都显示了字符串的用法。

Kafka 代理版本:0.10.1.0 PyKafka 版本:2.7.0

4

2 回答 2

3

好吧,我完全被误导了:那不是 IP,而是 base64 中的主机名(由 Docker 生成)。

于 2018-05-14T08:42:37.883 回答
1

检查代理的advertised.listeners配置 - 它定义了在 pykafkaCluster初始化期间将发送到 ZooKeeper 并转发到 pykafka 客户端的主机名。Docker 可能会破坏此信息,因此您需要使用advertised.listeners. 从文档中:

listeners如果与配置属性不同,则发布到 ZooKeeper 以供客户端使用的侦听器。在 IaaS 环境中,这可能需要与代理绑定的接口不同。

至于字节/字符串问题,pykafka的最新开发版本接受主题和消费者组名称的字符串或字节,以方便程序员。对于旧版本,您需要使用以下技术将字符串参数转换为字节:

topic_name = str_topic_name.encode('ascii')
于 2018-05-14T23:04:14.860 回答