-1

当试图让一个AIOKafkaConsumer从特定偏移量开始读取消息时starting_offset,我们如何知道要使用哪个分区?

我正在尝试使用该AIOKafkaConsumer.seek方法,但它需要指定一个TopicPartition

import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer


async def main():
    topic = "test"
    starting_offset = 3
    
    # Publish some messages
    producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
    await producer.start()
    for i in range(10):
        await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))

    # Start consuming from a specific offset
    consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
    await consumer.start()
    consumer.seek(None, starting_offset)

    while True:
        message = await consumer.getone()
        print("message:", message.value)


if __name__ == "__main__":
    asyncio.run(main())
4

1 回答 1

1

您的主题是否只有一个分区?如果是这样,那么使用1... 否则,没有直接的答案。

分区具有单独的偏移值。您可以将所有分区查找到相同的偏移量,但这不能保证所有分区都存在,您首先需要遍历分区号范围(请参阅 参考资料partitions_for_topic(topic))以单独查找它们。

于 2021-08-05T14:56:23.977 回答