0

我正在使用 pykafka 库在 Kafka 上发布消息。我的数据集是 JSON

{"user": "jpoole", "created_at_unixtime": 1440407147.033846, "id": 3600730356622213650, "text": "Techical support for my new computer as A+, thank you @fudgemart", "created_at": "Mon Aug 24 05:05:47 +0000 2015"} 
]

我的要求是生成 2 条 kafka 消息,使用 PyKafka 为上面的每个 JSON 字符串生成 1 条。到目前为止,我已经尝试过以下方法。

from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']

with open('./tweets.json') as f:
    dataItems =json.load(f)

s=json.dumps(dataItems).encode('utf-8')


with topic.get_sync_producer() as producer:
    for data in s:
        producer.produce(data)

我已将 JSON 加载到文件中(我最初的要求)。上面的代码有效,但它没有将第一个 JSON 字符串作为一个整体,而是将字符串中的每个字符都作为一条消息。

我的要求是将每个 JSON 字符串作为单独的 Kafka 消息发布。

Message 1
{"user": "jpoole", "created_at_unixtime": 1448221456.6646008, "id": 3731785240073317438, "text": "Glad I bought my electronics from @fudgemart", "created_at": "Sun Nov 22 14:44:16 +0000 2015"}

Message 2
{"user": "jpoole", "created_at_unixtime": 1440407147.033846, "id": 3600730356622213650, "text": "Techical support for my new computer as A+, thank you @fudgemart", "created_at": "Mon Aug 24 05:05:47 +0000 2015"}

谢谢

4

1 回答 1

0

没有任何日志很难说,但我认为问题可能出在for data in s:.

JSON.dumps()产生一个字符串,所以s是一个字符串,并且data是一个字符。所以问题是你要分别制作每个角色。

于 2021-05-21T18:39:19.043 回答