2

我正在尝试为 python kafka 创建一个简单的函数,但在将字符串循环传递给 producer.send_messages 方法时遇到了麻烦。

from kafka import SimpleProducer, KafkaClient
import random
import os

kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
x = 0

while x!=1:
        #producer.send_messages(b'test1',b'str(i)') #the program will run with this type of input
        a=random.random()
        message_method= "b"+"'"+topic+"'"+","+"b"+"'"+str(a)+"'"
        producer.send_messages(message_method)

我会以这个错误结束,

 File "<stdin>", line 4, in <module>
  File "/usr/lib/python2.7/site-packages/kafka/producer/simple.py", line 52, in send_messages
    partition = self._next_partition(topic)
  File "/usr/lib/python2.7/site-packages/kafka/producer/simple.py", line 36, in _next_partition
    self.client.load_metadata_for_topics(topic)
  File "/usr/lib/python2.7/site-packages/kafka/client.py", line 378, in load_metadata_for_topics
    kafka.common.check_error(topic_metadata)
  File "/usr/lib/python2.7/site-packages/kafka/common.py", line 233, in check_error
    raise error_class(response)
kafka.common.UnknownError: TopicMetadata(topic="b'test1',b'0.595202345912'", error=17, partitions=[])

我很欣赏任何建议,以使其成为一种更动态的方式(无需硬编码 b'messages'...等)以将消息也放入 kafka。:D

4

1 回答 1

0

您应该以这种方式使用 send_message 函数:

producer.send_messages('test1', str(a))

我觉得比较合适。

于 2015-10-20T08:21:56.980 回答