当试图让一个AIOKafkaConsumer
从特定偏移量开始读取消息时starting_offset
,我们如何知道要使用哪个分区?
我正在尝试使用该AIOKafkaConsumer.seek
方法,但它需要指定一个TopicPartition。
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
async def main():
topic = "test"
starting_offset = 3
# Publish some messages
producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
await producer.start()
for i in range(10):
await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))
# Start consuming from a specific offset
consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
await consumer.start()
consumer.seek(None, starting_offset)
while True:
message = await consumer.getone()
print("message:", message.value)
if __name__ == "__main__":
asyncio.run(main())