我正在使用 pykafka 库在 Kafka 上发布消息。我的数据集是 JSON
{"user": "jpoole", "created_at_unixtime": 1440407147.033846, "id": 3600730356622213650, "text": "Techical support for my new computer as A+, thank you @fudgemart", "created_at": "Mon Aug 24 05:05:47 +0000 2015"}
]
我的要求是生成 2 条 kafka 消息,使用 PyKafka 为上面的每个 JSON 字符串生成 1 条。到目前为止,我已经尝试过以下方法。
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
with open('./tweets.json') as f:
dataItems =json.load(f)
s=json.dumps(dataItems).encode('utf-8')
with topic.get_sync_producer() as producer:
for data in s:
producer.produce(data)
我已将 JSON 加载到文件中(我最初的要求)。上面的代码有效,但它没有将第一个 JSON 字符串作为一个整体,而是将字符串中的每个字符都作为一条消息。
我的要求是将每个 JSON 字符串作为单独的 Kafka 消息发布。
Message 1
{"user": "jpoole", "created_at_unixtime": 1448221456.6646008, "id": 3731785240073317438, "text": "Glad I bought my electronics from @fudgemart", "created_at": "Sun Nov 22 14:44:16 +0000 2015"}
Message 2
{"user": "jpoole", "created_at_unixtime": 1440407147.033846, "id": 3600730356622213650, "text": "Techical support for my new computer as A+, thank you @fudgemart", "created_at": "Mon Aug 24 05:05:47 +0000 2015"}
谢谢