0

我有一个 Kafka 主题,我们称之为摄取,它每秒接收一个条目x。我有一个要在此数据上运行的进程,但它必须一次运行 100 个事件。因此,我想将这些条目批处理在一起并将它们发送到一个名为batched-ingest的新主题。这两个主题将如下所示...

ingest = [entry, entry, entry, ...]

batched-ingest = [[entry_0, entry_1, ..., entry_99]]

使用浮士德的正确方法是什么?我现在的解决方案是这个......

app = faust.App("explore", value_serializer="raw")
ingest = app.topic('ingest')
ingest_batch = app.topic('ingest-batch')


@app.agent(ingest, sink=[ingest_batch])
async def test(stream):
    async for values in stream.take(10, within=1000):
        yield values

我不确定这是否是《浮士德》中这样做的正确方法。如果是这样,我应该设置什么within以使其始终等到len(values) = 100

4

1 回答 1

0

faust take 文档within中所述,如果您从代码中省略 ,take(100, within=10)则如果有 99 条消息并且从未收到最后 100 条消息,则代码将永远阻塞。为了解决这个问题,添加一个超时时间,以便在 10 秒内处理多达 100 个值。因此,如果有 10 秒的时间段没有收到任何事件,它仍然会处理它收集的内容。

于 2021-07-06T16:11:00.167 回答