15

kafka-python (1.0.0) 在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer 和 /usr/bin/kafka-console-consumer 工作正常。

Python应用程序以前也可以正常工作,但是在zookeeper重新启动后,它不再可以连接。

我正在使用文档中的简单示例:

from kafka import KafkaProducer
from kafka.common import KafkaError

producer = KafkaProducer(bootstrap_servers=['hostname:9092'])

# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')

我收到此错误:

Traceback (most recent call last):   File "pp.py", line 4, in <module>
    producer = KafkaProducer(bootstrap_servers=['hostname:9092'])   File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
    self.config['api_version'] = client.check_version()   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
    connect(node_id)   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
    raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored

单步执行(/usr/lib/python2.6/site-packages/kafka/client_async.py)时,我注意到第 270 行的评估结果为 false:

270         if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271             if self._can_send_request(node_id):
272                 return True
273         return False

在我的情况下 self._metadata_refresh_in_progress 是假的,但 ttl() = 0;

同时 kafka-console-* 正在愉快地推送消息:

/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2

有什么建议吗?

4

7 回答 7

30

我遇到了同样的问题,上面的解决方案都没有奏效。然后我阅读了异常消息,似乎必须指定 api_version,所以

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,1,0))

注意:(1,0,0)与 kafka 版本匹配的元组1.0.0

工作正常(至少无一例外地完成,现在必须说服它接受消息;))

于 2016-10-27T11:03:29.747 回答
8

我有一个类似的问题。就我而言,代理主机名在客户端无法解析。尝试advertised.host.name在配置文件中显式设置。

于 2016-03-01T10:32:08.070 回答
3

一个主机可以有多个 dns 别名。它们中的任何一个都可以用于 ssh 或 ping 测试。但是 kafka 连接应该使用在代理文件中匹配的advertised.host.name别名server.properties

我在bootstrap_servers参数中使用了不同的别名。因此出现错误。一旦我将调用更改为使用advertised.hostname,问题就解决了

于 2016-03-05T09:14:20.617 回答
3

我有同样的问题。

我用 user3503929 的提示解决了这个问题。

kafka 服务器安装在 windows 上。

服务器属性

...
host.name = 0.0.0.0
...

.

producer = KafkaProducer(bootstrap_servers='192.168.1.3:9092',         
                                         value_serializer=str.encode)
producer.send('test', value='aaa')
producer.close()
print("DONE.")

windows kafka客户端处理没有问题。但是,当我在ubuntu中使用kafka-python向主题发送消息时,会引发异常。NoBrokersAvailable

将以下设置添加到 server.properties。

...
advertised.host.name = 192.168.1.3
...

它在相同的代码中成功运行。为此,我花了三个小时。

谢谢

于 2017-11-03T02:45:16.900 回答
2

使用安装 kafka-pythonpip install kafka-python

创建 kafka 数据管道
的步骤:- 1. 使用 shell 命令运行 Zookeeper 或使用安装 zookeeperd

sudo apt-get install zookeeperd 

这会将 zookeeper 作为守护进程运行,默认监听 2181 端口

  1. 运行卡夫卡服务器
  2. 在不同的控制台上运行带有 producer.py 和 consumer.py 的脚本以查看实时数据。

以下是要运行的命令:-

cd kafka-directory
./bin/zookeeper-server-start.sh  ./config/zookeeper.properties    
./bin/kafka-server-start.sh  ./config/server.properties

现在你已经运行了 zookeeper 和 kafka 服务器,运行 producer.py 脚本和 consumer.py

生产者.py:

from kafka import KafkaProducer 导入时间

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'test'
lines = ["1","2","3","4","5","6","7","8"]
for line in lines:
  try:
    producer.send(topic, bytes(line, "UTF-8")).get(timeout=10)
  except IndexError as e:
    print(e)
  continue

消费者.py:-

from kafka import KafkaConsumer
topic = 'test'
consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'])
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    # print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    #                                       message.offset, message.key,
    #                                       message.value))
    print(message)

现在在不同的终端运行 producer.py 和 consumer.py 以查看实时数据..!

注意:上面的 producer.py 脚本只运行一次就可以永久运行,使用 while 循环并使用 time 模块。

于 2018-03-10T17:35:34.523 回答
1

我遇到了类似的问题,从 bootstrap_servers 中删除端口有所帮助。

consumer = KafkaConsumer('my_topic',
                     #group_id='x',
                     bootstrap_servers='kafka.com')
于 2016-05-31T22:24:49.430 回答
0

在您的 server.properties 文件中,确保将侦听器 IP 设置为远程机器可以访问的框 IP 地址。默认情况下它是本地主机

更新 server.properties 中的这一行:

listeners=PLAINTEXT://<Your-IP-address>:9092

还要确保您没有可能阻止其他 IP 地址与您联系的防火墙。如果你有 sudo 特权。尝试禁用防火墙。

sudo systemctl stop firewalld
于 2017-08-22T12:51:51.710 回答