0

我在 python 的 pykafka 中使用 KafkaClient。我正在尝试读取一个文本文件并将其行生成一个主题,然后由消费者读取。然而,在运行时,它只读取消息中的单个字母,而不是文本文件的单词或行。我究竟做错了什么?

我的制片人是

from pykafka import KafkaClient

text = open('filename.txt','r').read()
text = text.split()
client = KafkaClient(hosts='localhost:9099')
topic = client.topics['topic']
producer = topic.get_sync_producer()
for i in text:
     producer.produce(i.encode('ascii'))

我的消费者是

from pykafka import KafkaClient
client = KafkaClient(hosts='localhost:9099')
topic = client.topics['topic']
consumer = topic.get_simple_consumer()
for message in consumer:
    if message is not None:
        print(message.offset, message.value.decode())

将不胜感激一些指针。我想知道这是否是读取文本文件并通过 kafka 运行的最佳方式。

4

1 回答 1

0

通过将库切换到 kafka 库,我得到了所需的输出。

制片人

from kafka import KafkaProducer

producer = 
KafkaProducer(bootstrap_servers='localhost:9099')

with open('filename.txt') 
as f:
    for line in f:
      
     producer.send('topic',line.encode('ascii'))

 producer.flush()
 producer.close()

消费者

from kafka import KafkaConsumer

consumer = 

KafkaConsumer('topic',bootstrap_servers='localhost:9099')

for msg in consumer:
    print(msg.value.decode())
于 2020-08-27T13:10:23.123 回答