我有equipment_id
那个可以1, 5, 7 or 10
,我希望它们被放置在不同的分区中。我怎样才能做到这一点?我是否必须首先创建适当数量的分区(在本例中为 4 个)?如果是这样,下一步是什么?
import json
import random
from confluent_kafka.cimpl import Producer
def delivery_callback(err, msg):
if err:
print('%% Message failed delivery: %s\n' % err)
else:
print('%% Message delivered to %s [%d] @ %d\n' % (msg.topic(), msg.partition(), msg.offset()))
p = Producer({'bootstrap.servers': 'localhost:9092'})
p.poll(0)
equipment_id = random.choice([1, 5, 7, 10])
message = {'equipment_id': equipment_id, "value": random.randint(1, 100)}
p.produce(topic='data', value=json.dumps(message).encode(), key=str(equipment_id), callback=delivery_callback)
p.flush()
方法中有partition
参数produce()
。但它接受分区的序列号:0, 1, 2, 3
. 如果我str(equipment_id)
为key
参数定义它总是将数据发送到分区 0(根据输出delivery_callback()
)。