我想在 Kafka 中存储大文件,使用有关记录的元数据在将来检索它们。
因此,我发送包含主题、partition_id、偏移量的消息,然后尝试以这种方式检索文件:
def retrieve_file_from_kafka(topic_name, partition_id, offset):
client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
topic = client.topics[bytes(topic_name, "UTF-8")]
consumer = topic.get_balanced_consumer(
consumer_group=bytes("file_retrieve" + topic_name + str(partition_id) + str(offset), "UTF-8"))
consumer.reset_offsets([(topic.partitions[partition_id], offset)])
return consumer.consume()
但它不起作用,只是打印:
Offset reset for partition 0 to timestamp 8 failed. Setting partition 0's internal counter to 8
这个错误非常神秘,它发生在 reset_offsets 上。当我尝试消费时,该进程然后卡住等待 rebalancing_lock。我究竟做错了什么?