kafka-python (1.0.0) 在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer 和 /usr/bin/kafka-console-consumer 工作正常。
Python应用程序以前也可以正常工作,但是在zookeeper重新启动后,它不再可以连接。
我正在使用文档中的简单示例:
from kafka import KafkaProducer
from kafka.common import KafkaError
producer = KafkaProducer(bootstrap_servers=['hostname:9092'])
# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')
我收到此错误:
Traceback (most recent call last): File "pp.py", line 4, in <module>
producer = KafkaProducer(bootstrap_servers=['hostname:9092']) File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
self.config['api_version'] = client.check_version() File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
connect(node_id) File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored
单步执行(/usr/lib/python2.6/site-packages/kafka/client_async.py)时,我注意到第 270 行的评估结果为 false:
270 if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271 if self._can_send_request(node_id):
272 return True
273 return False
在我的情况下 self._metadata_refresh_in_progress 是假的,但 ttl() = 0;
同时 kafka-console-* 正在愉快地推送消息:
/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2
有什么建议吗?