3

从浮士德文档中,我无法找到如何将消费者设置为特定的偏移量。

使用 confluent-kafka 我使用 consumer.offsets_for_times 来查找 start_offset,然后将 TopicPartition 分配给该特定偏移量,例如:

start_offset = consumer.offsets_for_times([
    TopicPartition("prediction.OfferPredictionCheckpoint", 0, int(start_date)),
    TopicPartition("prediction.OfferPredictionCheckpoint", 1, int(start_date)),
])

consumer.assign([
    TopicPartition("prediction.OfferPredictionCheckpoint", partition_number, pos)
])

对于浮士德,我找不到更多的东西:

consumer_auto_offset_reset

这只让你设置最早或最晚。我将如何从特定时间或一天的开始开始阅读?

4

2 回答 2

1

我认为这可能是您正在寻找的:https ://faust.readthedocs.io/en/latest/reference/faust.transport.consumer.html#faust.transport.consumer.Consumer.seek

它可以转到特定的偏移量,但是我认为没有一种简单的方法可以告诉它在没有一些额外逻辑的情况下转到特定的时间或日期(也许使用偏移量进行二进制搜索?)。

于 2020-02-19T18:25:52.167 回答
0

要将偏移量设置为特定值,您可以使用这些示例。在这里,我将偏移量设置为 50000。每次启动我的应用程序时,代理都会在偏移量 50000 处开始读取。为此,我使用app.consumer.seek

这里 tp 有两个参数,在这种情况下是 topic - test 和 0 ,它是分区号。更多信息faust.types

from faust.types import TP, Message

tp = TP("test", 0)
topic = app.topic(tp.topic)

@app.task()
async def on_start():
    await app.consumer.seek(tp, 50000)
    print("App startet")

@app.agent(topic)
async def receive(stream):
    async for event in stream.events():
        print((event.message.offset, event.value))
于 2021-05-28T09:27:18.490 回答