from kafka import KafkaConsumer
from kafka import TopicPartition
TOPIC = "test_topic"
PARTITION = 0
consumer = KafkaConsumer(
group_id=TOPIC,
auto_offset_reset="earliest",
bootstrap_servers="localhost:9092",
request_timeout_ms=100000,
session_timeout_ms=99000,
max_poll_records=100,
)
topic_partition = TopicPartition(TOPIC, PARTITION)
# format: topic, partition
consumer.assign([topic_partition])
consumer.seek(topic_partition, 1660000)
# format: TopicPartition, offset. 1660000 is the offset been set.
for message in consumer:
# do something
- 这只会分配一个分区并为该分区设置偏移量,如果有多个分区,则需要为每个分区分配一个,然后设置偏移量。
- aalmeida88 的答案有时对我有用,在某些情况下,它确实有效,并且 aalmeida88 给了我寻找的想法,它似乎也是一种有用的方法。
- 另一个你可能需要注意的是,当你自己分配分区时,kafka manager 似乎无法获取消费者信息,这可能是因为你在分配分区时将它设置在 kafka 而不是 zookeeper 中,所以 kafka manager 可能没有得到那个信息。希望能帮助到你!
- -编辑 - - -
找到更好的方法来做到这一点。
topic_partition = TopicPartition(TOPIC,
message.partition)
consumer.seek(topic_partition, offset_value)
consumer.commit()
这将从kafka获取的消息中提取分区信息并保存子句以手动分配分区,从而在程序中需要设置多个分区的偏移量(并不罕见)时带来方便。
ps:为了保证一个分区只设置一次,需要根据你的应用设置一个flag。