0

我有equipment_id那个可以1, 5, 7 or 10,我希望它们被放置在不同的分区中。我怎样才能做到这一点?我是否必须首先创建适当数量的分区(在本例中为 4 个)?如果是这样,下一步是什么?

import json
import random

from confluent_kafka.cimpl import Producer


def delivery_callback(err, msg):
    if err:
        print('%% Message failed delivery: %s\n' % err)
    else:
        print('%% Message delivered to %s [%d] @ %d\n' % (msg.topic(), msg.partition(), msg.offset()))


p = Producer({'bootstrap.servers': 'localhost:9092'})
p.poll(0)
equipment_id = random.choice([1, 5, 7, 10])
message = {'equipment_id': equipment_id, "value": random.randint(1, 100)}
p.produce(topic='data', value=json.dumps(message).encode(), key=str(equipment_id), callback=delivery_callback)
p.flush()

方法中有partition参数produce()。但它接受分区的序列号:0, 1, 2, 3. 如果我str(equipment_id)key参数定义它总是将数据发送到分区 0(根据输出delivery_callback())。

4

0 回答 0