2

我有一个很大的 csv,我想写一个 kafka 主题。

def producer():
    producer = KafkaProducer(bootstrap_servers='mykafka-broker')
    with open('/home/antonis/repos/testfile.csv') as file:
        reader = csv.DictReader(file, delimiter=";")
        for row in reader:
            producer.send(topic='stable_topic', value=row)
            producer.flush()

if __name__ == '__main__':
    producer()

此代码产生错误:

AssertionError: value must be bytes

该文件如下所示:

"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22

谁能帮我这个?

4

2 回答 2

2

与其重新发明轮子,不如使用已经存在的非常好的一个:) 它是Kafka Connect,它是 Apache Kafka 的一部分。

有几个可以从 CSV 读取的连接器,包括Kafka Connect spooldir(参见示例)和Filepulse

在本次演讲中了解有关 Kafka Connect 的更多信息。

于 2020-05-27T10:11:27.630 回答
2

您需要正确序列化您的值。


以下应该可以解决问题:

import json  

producer = KafkaProducer(
    bootstrap_servers='mykafka-broker',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
于 2020-05-27T10:12:36.810 回答