1

我正在调试一个简单的应用程序:

import faust

app = faust.App('app08')

# want to start from the beginning of the 
# topic every time the application restarts
@app.agent(topic) 
async def process(stream):
    async for event in stream:
        print(event)

并希望在重新启动此应用程序时让代理从最早的偏移量中读取。现在,它很聪明,知道最后一条消息的读取位置,并在重新启动时从该位置开始。尽管搜索了一段时间的文档,但我找不到如何执行此操作的示例。我知道如何做到这一点的唯一方法是更改​​应用程序名称,例如:更改app08app09.

4

1 回答 1

2

请记住,偏移量是由 Kafka 服务器使用与您的 faust 应用程序同名的消费者组控制的,我一直在使用kafaka-consumer-groupsCLI(您的 kafka 安装的一部分)来执行此操作。

kafka-consumer-groups --bootstrap-server kafka_bootstrap --reset-offsets --to-earliest --group faust_appname --execute --all-topics

如果您运行的是相对较新的 Kafka 版本,您还可以替换--to-earliest--to-datetime并提供格式中的时间戳。2020-09-20T00:00:00.00

如果您想自动执行此操作,我还有 Python API 可以自动控制消费者组。

于 2020-09-24T13:58:00.823 回答