我正在尝试为 python kafka 创建一个简单的函数,但在将字符串循环传递给 producer.send_messages 方法时遇到了麻烦。
from kafka import SimpleProducer, KafkaClient
import random
import os
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
x = 0
while x!=1:
#producer.send_messages(b'test1',b'str(i)') #the program will run with this type of input
a=random.random()
message_method= "b"+"'"+topic+"'"+","+"b"+"'"+str(a)+"'"
producer.send_messages(message_method)
我会以这个错误结束,
File "<stdin>", line 4, in <module>
File "/usr/lib/python2.7/site-packages/kafka/producer/simple.py", line 52, in send_messages
partition = self._next_partition(topic)
File "/usr/lib/python2.7/site-packages/kafka/producer/simple.py", line 36, in _next_partition
self.client.load_metadata_for_topics(topic)
File "/usr/lib/python2.7/site-packages/kafka/client.py", line 378, in load_metadata_for_topics
kafka.common.check_error(topic_metadata)
File "/usr/lib/python2.7/site-packages/kafka/common.py", line 233, in check_error
raise error_class(response)
kafka.common.UnknownError: TopicMetadata(topic="b'test1',b'0.595202345912'", error=17, partitions=[])
我很欣赏任何建议,以使其成为一种更动态的方式(无需硬编码 b'messages'...等)以将消息也放入 kafka。:D